【问题标题】:How to retry failed ConsumerRecord in reactor-kafka如何在reactor-kafka中重试失败的ConsumerRecord
【发布时间】:2021-11-22 19:29:20
【问题描述】:

我正在尝试使用 reactor-kafka 来消费消息。其他一切正常,但我想为失败的消息添加重试(2)。 spring-kafka 默认已经重试了 3 次失败记录,我想用 reactor-kafka 来实现。

我使用 spring-kafka 作为响应式 kafka 的包装器。以下是我的消费者模板:

reactiveKafkaConsumerTemplate
                .receiveAutoAck()
                .map(ConsumerRecord::value)
                .flatMap(this::consumeWithRetry)
                .onErrorContinue((error, value)->log.error("something bad happened while consuming : {}", error.getMessage()))
                .retryWhen(Retry.backoff(30, Duration.of(10, ChronoUnit.SECONDS)))
                .subscribe();

让我们考虑consume方法如下

public Mono<Void> consume(MessageRecord message){
   return Mono.error(new RuntimeException("test retry"); //sample error scenario
}

我正在使用以下逻辑在失败时重试使用方法。

public Mono<Void> consumeWithRetry(MessageRecord message){
   return consume(message)
          .retry(2);
}

如果当前消费者记录因异常而失败,我想重试使用该消息。我试图用另一个 retry(3) 包装消耗方法,但这并没有达到目的。最后一次 retryWhen 仅用于在 kafka rebalances 上重试订阅。

@simon-baslé @gary-russell

【问题讨论】:

    标签: spring-kafka reactor reactor-kafka


    【解决方案1】:

    之前重试时我使用的是以下方法:

    public Mono<Void> consumeWithRetry(MessageRecord message){
       return consume(message)
              .retry(2);
    }
    

    但它没有重试。添加 Mono.defer 后,上面的代码可以工作并添加所需的重试。

    public Mono<Void> consumeWithRetry(MessageRecord message){
       return Mono.defer(()->consume(message))
              .retry(2);
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-07-22
      • 2019-08-29
      • 1970-01-01
      • 1970-01-01
      • 2019-05-20
      • 2020-07-03
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多