admin管理员组文章数量:1608628
【Spring Cloud】 CircuitBreaker 组件 resilience4j 一
- 前言
- resilience4j
- CircuitBreaker
- Bulkhead
- Bulkhead
- ThreadPoolBulkhead
- RateLimiter
- Retry
- TimeLimiter
- 总结
前言
Spring Cloud 2020
版本之后废弃了对大部分 Netflix
组件的支持,其中就包括最常用的 熔断
组件 Hystrix
,取而代之的就是 resilience4j
sentinel
Spring Retry
等,本文要介绍的就是 resilience4j
首先,resilience4j
本身就是一个十分优秀的组件,且功能十分丰富,包括 熔断
限流
限时
重试
等多种组件功能,因此本文大致分三个部分解读:
resilience4j
本身的组件功能及使用方法resilience4j-spring
,即resilience4j
组件在Spring
下的使用resilience4j
在SpringCloud
下的使用
值得注意的是,在 Spring 和 SpringCloud 下的使用并不是一回事,
SpringCloud 只是提供了一层 CircuitBreakerFactory 的封装,更多
“高级” 功能例如 基于配置文件配置 基于注解声明 都是由
resilience4j-spring(resilience4j-spring-boot2)提供
resilience4j
正如前面所说,resilience4j
是一个很优秀的三方组件,具有不限于 熔断
的其他多种功能,以组件的形式提供:
CircuitBreaker
:熔断TimeLimiter
:超时限制RateLimiter
:限流Bulkhead
(ThreadPoolBulkhead
):舱壁Retry
:重试- 其他
本章节不讨论上述组件的实现细节(因为我也不知道),仅打算以示例入手来掌握 resilience4j
,这样才能为之后与 SpringBoot
SpringCloud
的整合打好基础
该说不说,对比于 Hystrix 基于 RxJava2 的实现,resilience4j 的实现应该
相对容易看懂点
整体上,上述所有组件都是以 Register
-> Config
-> 实例
的形式进行管理,即 注册中心
基于 Config
来管理一个 组件实例
。当然,直接通过 Config
创建实例也是可以的,但是考虑到后续与 Spring
的整合,由容器管理一个统一的 Register实例
自然是相对契合的,因此本文就以这种模式进行示例
CircuitBreaker
首先简单聊一下 熔断
机制(纯口述,想了解的更严谨请查阅其他资料):
类似于保险丝,微服务间的通讯不能因为单方面的故障或者网络问题而阻塞,比如:服务 A
访问已经挂掉的服务 B
,在超时限制内,该请求线程会一直阻塞在这里(甚至还会重试),最终甚至影响到其他服务,造成 服务雪崩
此时对于这种情况,CircuitBreaker
提供以下能力:当请求失败次数(或者慢请求次数)超过阈值,该 “保险丝” 就会 断开,此后打到服务 B
的请求会直接被拒绝(当然也可以配置更加优雅的 “降级处理”)。同时,断开的 “保险丝” 在一定时间(配置)后进入 “半开” 状态,然后根据之后的服务状态选择再次 “断开” 或者 “关闭”
然后直接上代码,看看 resilience4j
的 CircuitBreaker
组件如何使用
@Test
public void test1() throws InterruptedException {
// config
CircuitBreakerConfig circuitBreakerConfig =
CircuitBreakerConfig
.custom()
// 至少4次请求开始生效,即
.minimumNumberOfCalls(4)
// 50% 的请求错误断路器就打开,即 4 * 50% = 2 次请求失败则熔断
.failureRateThreshold(50)
// 抛出 MyTestException 异常就算错误
.recordExceptions(MyTestException.class)
// 还有很多配置注入 慢请求、异常谓语判断 等等
.build();
// register
CircuitBreakerRegistry circuitBreakerRegistry =
CircuitBreakerRegistry.of(circuitBreakerConfig);
// instance
CircuitBreaker circuitBreaker =
circuitBreakerRegistry.circuitBreaker("test1");
// mock,会抛出 MyTestException 异常
HelloService mock = mock(HelloService.class);
given(mock.hello()).willThrow(new MyTestException());
// 由组件 circuitBreaker 装饰
Supplier<String> supplier = mock::hello;
Supplier decorateSupplier = circuitBreaker.decorateSupplier(supplier);
/**
* 请求 5 次, 前 4 次错误率 100% > 阈值 50%,第 5 次请求
* io.github.resilience4j.circuitbreaker.CallNotPermittedException:
* CircuitBreaker 'test1' is OPEN and does not permit further calls
*/
IntStream.range(0, 5).forEach(
i -> Try.ofSupplier(decorateSupplier)
.onSuccess(System.out::println)
.onFailure(System.out::println)
);
}
CircuitBreakerConfig
提供配置,示例中的配置:服务抛出MyTestException
即视为失败,以4
次请求为基线,当失败率超过50%
,则断路器断开CircuitBreakerRegistry
基于CircuitBreakerConfig
创建CircuitBreaker
实例,以name
唯一标识- 目标服务通过
mock
模拟,抛出MyTestException
CircuitBreaker
可以以Supplier
Callable
CompletionStage
等多种形式包装目标方法,示例中使用Supplier
- 示例中使用
Try.ofSupplier
执行最终请求,之后可以使用onSuccess
map
等对结果进行链式处理,当然也可以直接CircuitBreaker#executeXXX
执行最终请求 - 最终结果:服务会在第
5
次调用时熔断 - 之后的其他组件示例与此如出一辙,若无需要不再重复上述部分说明
Bulkhead
舱壁
组件,舱壁
是一个设计名词,即资源的隔离(起源于船舱上货物的隔离?)
之前介绍 熔断
机制介绍过,当目标服务宕机时,请求线程会一直被占用,假如所有的线程被资源都被阻塞在这里,那当前服务就死掉了,因此每个服务客户端的请求线程隔离是必须的,该功能由组件 Bulkhead
和 ThreadPoolBulkhead
提供
Bulkhead
:基于信号量的实现ThreadPoolBulkhead
:基于线程池的实现
直接上示例
Bulkhead
@Test
public void test() {
BulkheadConfig bulkheadConfig =
BulkheadConfig
.custom()
// 最多两个请求并行
.maxConcurrentCalls(2)
.build();
BulkheadRegistry bulkheadRegistry = BulkheadRegistry.of(bulkheadConfig);
Bulkhead bulkhead = bulkheadRegistry.bulkhead("test1");
HelloService mock = mock(HelloService.class);
given(mock.hello()).willReturn("hello");
Supplier<String> supplier = mock::hello;
List result = new ArrayList<>();
IntStream.range(0, 4).forEach(
i -> CompletableFuture.runAsync(
() -> {
String temp;
try {
temp = bulkhead.executeSupplier(supplier);
} catch (Throwable e) {
temp = e.getMessage();
}
result.add(temp);
}
)
);
/**
* Awaitility.await,模拟并发调用
* <dependency>
* <groupId>org.awaitility</groupId>
* <artifactId>awaitility</artifactId>
* <version>4.0.2</version>
* <scope>test</scope>
* </dependency>
*/
await().atMost(1, TimeUnit.MINUTES).until(() -> result.size() == 4);
/**
* Bulkhead 'test1' is full and does not permit further calls
* Bulkhead 'test1' is full and does not permit further calls
* hello
* hello
*/
result.stream().forEach(System.out::println);
}
- 配置规则为最多允许
2
个并发请求,即信号量为2
- 此处借助一个很好用的三方依赖(坐标见示例)
Awaitility
处理异步请求 - 最终
4
个请求只有两个打到了目标服务上
ThreadPoolBulkhead
@Test
public void test() {
ThreadPoolBulkheadConfig threadPoolBulkheadConfig =
ThreadPoolBulkheadConfig
.custom()
// 等同于线程池配置
.maxThreadPoolSize(2)
.coreThreadPoolSize(2)
// 这里不能为 0,否则等同于无限队列
.queueCapacity(2)
.build();
ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry =
ThreadPoolBulkheadRegistry.of(threadPoolBulkheadConfig);
ThreadPoolBulkhead threadPoolBulkhead =
threadPoolBulkheadRegistry.bulkhead("test1");
HelloService mock = mock(HelloService.class);
given(mock.hello()).willReturn("hello");
// ThreadPoolBulkhead 只能修饰 CompletionStage
Supplier<String> supplier = mock::hello;
Supplier<CompletableFuture> completableFutureSupplier = () ->
CompletableFuture.supplyAsync(supplier);
List result = new ArrayList<>();
IntStream.range(0, 6).forEach(
i -> CompletableFuture.runAsync(
() -> {
String temp;
try {
temp = threadPoolBulkhead.executeSupplier(supplier).toCompletableFuture().get();
} catch (Throwable e) {
temp = e.getMessage();
}
result.add(temp);
}
)
);
await().atMost(1, TimeUnit.MINUTES).until(() -> result.size() == 6);
/**
* maxThreadPoolSize(2) + queueCapacity(2)= 4 个请求成功
* Bulkhead 'test1' is full and does not permit further calls
* Bulkhead 'test1' is full and does not permit further calls
* hello
* hello
* hello
* hello
*/
result.stream().forEach(System.out::println);
}
- 值得注意的是:
ThreadPoolBulkhead
只能修饰的返回值类型为CompletionStage
- 一共发起
6
个请求,其中只有maxThreadPoolSize(2) + queueCapacity(2)= 4
个请求打到目标服务上
RateLimiter
RateLimiter
组件通过对系统时间切片进行限流操作,官方图解:
限流机制就不做过多解释了,直接看示例
@Test
public void test() {
RateLimiterConfig rateLimiterConfig =
RateLimiterConfig
.custom()
// 5s 内只允许一次请求打进来
.limitRefreshPeriod(Duration.ofSeconds(5))
.limitForPeriod(1)
// 被限流的请求不需要等待
.timeoutDuration(Duration.ZERO)
.build();
RateLimiterRegistry rateLimiterRegistry =
RateLimiterRegistry.of(rateLimiterConfig);
RateLimiter rateLimiter = rateLimiterRegistry.rateLimiter("test1");
HelloService mock = mock(HelloService.class);
given(mock.hello()).willReturn("hello");
/**
* 第二次请求被限流
* io.github.resilience4j.ratelimiter.RequestNotPermitted:
* RateLimiter 'test1' does not permit further calls
*/
IntStream.range(0, 2).forEach(
i -> System.out.println(rateLimiter.executeSupplier(mock::hello))
);
}
- 示例中配置限流机制为:
5s
内只接收一次请求且目标任务不等待 - 因此第二次请求被拒绝
Retry
重试机制也不做解释,代码示例:
@Test
public void test() throws InterruptedException {
RetryConfig retryConfig =
RetryConfig
.custom()
// 抛出异常 MyTestException 即重试
.retryExceptions(MyTestException.class)
// 一共尝试 3 次
.maxAttempts(3)
// 每次间隔 50ms
.waitDuration(Duration.ofMillis(1000))
.build();
RetryRegistry retryRegistry = RetryRegistry.of(retryConfig);
Retry retry = retryRegistry.retry("test1");
Supplier<String> supplier = () -> {
System.out.println("hello");
throw new MyTestException();
};
/**
* hello
* hello
* hello
* com.xsn.test.demo.MyTestException
*/
retry.executeSupplier(supplier);
}
- 示例配置一共尝试
3
次,每次间隔1s
TimeLimiter
超时机制不做解释,代码示例:
@Test
public void test() {
TimeLimiterConfig timeLimiterConfig =
TimeLimiterConfig
.custom()
// 1s 即超时
.timeoutDuration(Duration.ofSeconds(1))
.build();
TimeLimiterRegistry timeLimiterRegistry = TimeLimiterRegistry.of(timeLimiterConfig);
TimeLimiter timeLimiter = timeLimiterRegistry.timeLimiter("test1");
Supplier<CompletableFuture<String>> supplier = () -> {
return CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
});
};
/**
* TimeLimiter 'test1' recorded a timeout exception.
*/
try {
String s = timeLimiter.executeFutureSupplier(supplier);
System.out.println(s);
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
- 示例中超过
1s
视为超时,示例中延迟2s
TimeLimiter
能修饰类型为CompletionStage
或Future
,示例中为CompletableFuture
总结
如上,就是 resilience4j
的使用实例,可以看到使用起来还是比较麻烦的,一个服务倒好,但真正的微服务体系中我们需要对服务进行批量装饰,那纯靠手工 coding
必然是不行的,因此基于 resilience4j-spring
的注解声明和配置声明就显得十分重要了
下一篇:【Spring Cloud】解读 CircuitBreaker 组件 resilience4j 二
本文标签: 组件CloudSpringresilience4jCircuitBreaker
版权声明:本文标题:【Spring Cloud】 CircuitBreaker 组件 resilience4j 一 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://m.elefans.com/dongtai/1728549897a1163295.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论