【问题标题】:spring kafka cloud stream: limit retry attempts in batch modespring kafka cloud stream:在批处理模式下限制重试尝试
【发布时间】:2021-08-20 12:00:20
【问题描述】:

当在消费消息时抛出异常时,spring 会尝试不断地读取相同的消息,并且基本停止消费其他消息。我尝试设置 defaultRetryableretryableExceptions 属性,如下所示:

spring:
  cloud.stream:
    bindings:
      consumer-in-0:
        consumer:
          defaultRetryable: false
          retryable-exceptions:
            org.springframework.dao.DataIntegrityViolationException: false

写在这里https://docs.spring.io/spring-cloud-stream/docs/3.1.0/reference/html/spring-cloud-stream.html#_retry_template_and_retrybackoff

但它没有效果,如何禁用重复尝试读取失败消息或限制此类尝试的次数?

更新

看spring源码KafkaMessageChannelBinder

protected MessageProducer createConsumerEndpoint() {
    // ...
    if (!extendedConsumerProperties.isBatchMode()
                && extendedConsumerProperties.getMaxAttempts() > 1
                && transMan == null) {

            kafkaMessageDrivenChannelAdapter
                    .setRetryTemplate(buildRetryTemplate(extendedConsumerProperties));

所以看起来提到的属性仅在不使用批处理模式时才有效,这是我的情况 (batch==true)。想知道如何在批处理模式下处理重试?

【问题讨论】:

    标签: spring apache-kafka spring-cloud-stream


    【解决方案1】:

    默认情况下,批处理侦听器会永远重试,因为框架无法判断批处理中的哪条记录失败。

    最好在侦听器本身中以批处理模式处理错误。

    您可以添加一个ListenerContainerCustomizer bean 来配置不同的BatchErrorHandler。选项见https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-error-handling

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2021-09-17
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-10-08
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多