与Hystrix相比,它有以下一些主要的区别:
- Hystrix调用必须被封装到HystrixCommand里,而resilience4j以装饰器的方式提供对函数式接口、lambda表达式等的嵌套装饰,因此你可以用简洁的方式组合多种高可用机制
- Hystrix的频次统计采用滑动窗口的方式,而resilience4j采用环状缓冲区的方式
- 关于熔断器在半开状态时的状态转换,Hystrix仅使用一次执行判定是否进行状态转换,而resilience4j则采用可配置的执行次数与阈值,来决定是否进行状态转换,这种方式提高了熔断机制的稳定性
- 关于隔离机制,Hystrix提供基于线程池和信号量的隔离,而resilience4j只提供基于信号量的隔离
熔断 CircuitBreaker
熔断器模式:微软提出来的一种模式:https://docs.microsoft.com/en-us/previous-versions/msp-n-p/dn589784(v=pandp.10)
初始化熔断器
CircuitBreakerRegistry负责创建和管理熔断器实例CircuitBreaker,它是线程安全的,提供原子性操作。
CircuitBreakerRegistry circuitBreakerRegistry = CircuitBreakerRegistry.ofDefaults();
该方式使用默认的全局配置CircuitBreakerConfig创建熔断器实例,你也可以选择使用定制化的配置,可选项有:
- 触发熔断的失败率阈值
- 熔断器从打开状态到半开状态的等待时间
- 熔断器在半开状态时环状缓冲区的大小
- 熔断器在关闭状态时环状缓冲区的大小
- 处理熔断器事件的定制监听器CircuitBreakerEventListener
- 评估异常是否被记录为失败事件的定制谓词函数Predicate
代码如下:
maven:
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-circuitbreaker</artifactId>
<version>0.13.0</version>
</dependency>
初始化:
private void init() {
// Create a custom configuration for a CircuitBreaker
CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom()
.failureRateThreshold(50)//触发熔断的失败率阈值,注意是失败率
.waitDurationInOpenState(Duration.ofMillis(1000)) //熔断器从打开状态到半开状态的等待时间
.ringBufferSizeInHalfOpenState(2)//熔断器在半开状态时环状缓冲区的大小
.ringBufferSizeInClosedState(2)//熔断器在关闭状态时环状缓冲区的大小
.recordFailure(new Predicate<Throwable>() {
@Override
public boolean test(Throwable throwable) {
return false;
}
})//评估异常是否被记录为失败事件的定制谓词函数Predicate
.build();
// Create a CircuitBreakerRegistry with a custom global configuration
CircuitBreakerRegistry circuitBreakerRegistry = CircuitBreakerRegistry.of(circuitBreakerConfig);
// 从注册中心获取使用默认配置的熔断器
CircuitBreaker circuitBreaker_default = circuitBreakerRegistry.circuitBreaker("default");
// 从注册中心获取使用定制化配置的熔断器
CircuitBreaker circuitBreaker_custom = circuitBreakerRegistry
.circuitBreaker("cunstom", circuitBreakerConfig);
// 你也可以选择不经过注册中心,直接创建熔断器实例:
CircuitBreaker defaultCircuitBreaker = CircuitBreaker.ofDefaults("direct_default");
CircuitBreaker customCircuitBreaker = CircuitBreaker.of("direct_custom", circuitBreakerConfig);
}
熔断器使用方式
熔断器采用装饰器模式,你可以使用CircuitBreaker.decorateCheckedSupplier() / CircuitBreaker.decorateCheckedRunnable() / CircuitBreaker.decorateCheckedFunction() 装饰 Supplier / Runnable / Function / CheckedRunnable / CheckedFunction,然后使用Vavr的Try.of(…) / Try.run(…) 调用被装饰的函数,也可以使用map / flatMap / filter / recover / andThen链接更多的函数。函数链只有在熔断器处于关闭或半开状态时才可以被调用。
private void process() {
// 创建熔断器
CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("testName");
// 用熔断器包装函数
CheckedFunction0<String> decoratedSupplier = CircuitBreaker
.decorateCheckedSupplier(circuitBreaker,
() -> "This can be any method which returns: 'Hello");
// 函数链只有在熔断器处于关闭或半开状态时才可以被调用。
// 链接其它的函数
Try<String> result = Try.of(decoratedSupplier).map(value -> value + " world'");
// 如果函数链中的所有函数均调用成功,最终结果为Success<String>
Assert.assertEquals(result.isSuccess(), true);
Assert.assertEquals(result.get(), "This can be any method which returns: 'Hello world'");
}
private void process_morecircuitBreaker() {
// 两个熔断器
CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("testName");
CircuitBreaker anotherCircuitBreaker = CircuitBreaker.ofDefaults("anotherTestName");
// When I create a Supplier and a Function which are decorated by different CircuitBreakers
CheckedFunction0<String> decoratedSupplier = CircuitBreaker
.decorateCheckedSupplier(circuitBreaker, () -> "Hello");
CheckedFunction1<String, String> decoratedFunction = CircuitBreaker
.decorateCheckedFunction(anotherCircuitBreaker, (input) -> input + " world");
// and I chain a function with map
Try<String> result = Try.of(decoratedSupplier)
.mapTry(decoratedFunction::apply);
Assert.assertEquals(result.isSuccess(), true);
Assert.assertEquals(result.get(), "Hello world");
}
private void process_failure() {
// 创建一个环状缓冲区大小为2的熔断器
CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom()
.ringBufferSizeInClosedState(2)
.waitDurationInOpenState(Duration.ofMillis(1000))
.build();
CircuitBreaker circuitBreaker = CircuitBreaker.of("testName", circuitBreakerConfig);
// 模拟一次失败调用
circuitBreaker.onError(0, new RuntimeException());
Assert.assertEquals(circuitBreaker.getState(), State.CLOSED);
// 模拟第二次失败调用
circuitBreaker.onError(0, new RuntimeException());
// 失败率超过百分之五十,熔断器被触发
Assert.assertEquals(circuitBreaker.getState(), State.OPEN);
// 由于熔断器处于打开状态,调用失败
Try<String> result = Try
.of(CircuitBreaker.decorateCheckedSupplier(circuitBreaker, () -> "Hello"))
.map(value -> value + " world");
Assert.assertEquals(result.isFailure(), true);
Assert.assertEquals(result.failed().get().getClass(), CircuitBreakerOpenException.class);
System.out.println(result.failed().get().getMessage());
}
熔断器降级
/**
* 熔断器降级
*/
private void process_recover() {
// 创建熔断器
CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("testName");
// 模拟失败调用,并链接降级函数
CheckedFunction0<String> checkedSupplier = CircuitBreaker
.decorateCheckedSupplier(circuitBreaker, () -> {
throw new RuntimeException("BAM");
});
// 降级函数被调用,最终调用结果为成功
Try<String> result = Try.of(checkedSupplier).recover(throwable -> "hello recovery");
Assert.assertEquals(result.isSuccess(), true);
Assert.assertEquals(result.get(), "hello recovery");
}
熔断器失败判定
默认所有异常都被认定为失败事件,你可以定制Predicate的test检查,实现选择性记录,只有该函数返回为true时异常才被认定为失败事件。下例展示了如何忽略除WebServiceException外的所有异常:
private void process_recordfailure() {
// Given
CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom()
.ringBufferSizeInClosedState(2)
.ringBufferSizeInHalfOpenState(2)
.waitDurationInOpenState(Duration.ofMillis(1000))
/*.recordFailure(throwable ->
Match(throwable).of(
Case($(instanceOf(WebServiceException.class)), true),
Case($(), false)))*/
.build();
CircuitBreaker circuitBreaker = CircuitBreaker.of("testName", circuitBreakerConfig);
// Simulate a failure attempt
circuitBreaker.onError(0, new WebServiceException());
// CircuitBreaker is still CLOSED, because 1 failure is allowed
Assert.assertEquals(circuitBreaker.getState(), CircuitBreaker.State.CLOSED);
// When
CheckedRunnable checkedRunnable = CircuitBreaker.decorateCheckedRunnable(circuitBreaker, () -> {
throw new SocketTimeoutException("BAM!");
});
Try result = Try.run(checkedRunnable);
// Then
Assert.assertEquals(result.isFailure(), true);
// CircuitBreaker is still CLOSED, because SocketTimeoutException has not been recorded as a failure
Assert.assertEquals(circuitBreaker.getState(), State.CLOSED);
Assert.assertEquals(result.failed().get().getClass(), IOException.class);
}
监听熔断器事件
private void process_events() {
// 创建熔断器
CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("testName");
circuitBreaker.getEventPublisher()
.onSuccess(event -> logger.info("onSuccess"))
.onError(event -> logger.info("onError"))
.onIgnoredError(event -> logger.info("onIgnoredError"))
.onReset(event -> logger.info("onReset"))
.onStateTransition(event -> logger.info("onStateTransition"));
// Or if you want to register a consumer listening to all events, you can do:
circuitBreaker.getEventPublisher()
.onEvent(event -> logger.info("onEvent"));
CircuitBreaker.Metrics metrics = circuitBreaker.getMetrics();
float failureRate = metrics.getFailureRate();
int bufferedCalls = metrics.getNumberOfBufferedCalls();
int failedCalls = metrics.getNumberOfFailedCalls();
System.out.println("failureRate : " + failureRate + ", bufferedCalls:" + bufferedCalls
+ ", failedCalls:" + failedCalls);
}