【问题标题】:How to configure Kafka consumer retry property from application.properties in spring boot?如何在 Spring Boot 中从 application.properties 配置 Kafka 消费者重试属性?
【发布时间】:2022-01-09 12:36:32
【问题描述】:

在spring-boot中,

application.yml

kafka:
   bootstrap-servers: localhost:9092
   listener:
    concurrency: 10
    ack-mode: MANUAL
   producer:
    topic: test-record
    key-serializer: org.apache.kafka.common.serialization.StringSerializer
    value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    retries: 3
    orn-record:
      timeout: 3
    #acks: 1
   consumer:
    groupId: test-record
    topic: test
    enable-auto-commit: false
    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer

通过使用上述配置,我们可以避免spring boot中的java web(bean) based configuration,这是一个非常值得的优势。

问:我们可以从 application.properties / application.yml 添加 kafka 错误处理程序和 kafka 消费者重试属性数吗?

我找不到任何关于它的参考或文档,因此希望得到一些结论,只是因为这个问题,现在我必须在 Spring Boot 中转到 java web based configuration 并删除再次回到 @ 中的旧方式的属性配置987654325@。我相信应该有一些解决方法,我们可以通过property file configuration 实现这一点。

【问题讨论】:

    标签: spring spring-boot apache-kafka kafka-consumer-api spring-kafka


    【解决方案1】:

    消费者没有重试属性。如果未提交偏移量,则下一次轮询将从相同的偏移量重试。

    也没有任何可配置的错误处理类,它与 Kafka Streams 中的反序列化一样是带外的。

    如果你想处理反序列化错误,而不是处理错误,你可以这样设置

    spring:
      kafka:
        bootstrap-servers: ...
        consumer:
          # Configures the Spring Kafka ErrorHandlingDeserializer that delegates to the 'real' deserializers
          key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
          value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
        properties:
          # Delegate deserializers
          spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
          spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
    

    除此之外,您还可以利用代码中的DeadLetterPublishingRecovererSeekToCurrentErrorHandler 将错误事件生成到新主题以供检查和进一步处理。 - Source

    【讨论】:

      猜你喜欢
      • 2023-03-24
      • 2019-05-12
      • 1970-01-01
      • 2016-12-09
      • 1970-01-01
      • 2019-04-30
      • 2023-03-27
      • 2018-10-20
      • 1970-01-01
      相关资源
      最近更新 更多