【问题标题】:Different retry strategies for different consumers in KafkaKafka中针对不同消费者的不同重试策略
【发布时间】:2019-12-22 15:51:37
【问题描述】:

我们正在处理一个场景,我们需要针对同一应用程序中的不同消费者使用不同的重试策略。

请参考下图(简要架构图):

main_consumer 使用来自main_topic 的有效负载并尝试将其发送到 API。如果 API 处理失败,我们会将此失败的有效负载写入另一个名为 error_topic 的主题中。有一个不同的消费者 (error_consumer)error_topic 消费,并通过 3 次重试尝试再次将有效负载发送到 API。如果仍然失败,则error_consumer 将此有效负载推送到DLQ

我们面临的问题:

我们需要main_consumer 不重试失败,error_consumer 重试失败 3 次。我们将maxAttempts 作为main_consumer 的1 和maxAttempts 作为error_consumer 的3。但是使用这种配置,main_consumer 会重试 3 次,error_consumer 会重试一次。它的工作方式与我们的预期完全相反。

P.S : 我们尝试为两个消费者交换 maxAttempts 是徒劳的(这是不合逻辑的)。

下面是我们正在使用的 Spring 云流应用配置:

我们正在使用以下配置文件运行应用程序。

application-main.yml

spring:
  cloud:
    stream:
      kafka:
        bindings:
          main-consumer-channel:
            consumer:
              autoCommitOffset: false
      bindings:
        main-consumer-channel:
          destination: main_topic
          consumer:
            maxAttempts: 1
            backOffInitialInterval: 5000
            backOffMultiplier: 2

application-error-retry.yml

spring:
  cloud:
    stream:
      kafka:
        bindings:
          error-consumer-channel:
            consumer:
              autoCommitOffset: false
      bindings:
        error-consumer-channel:
          destination: error_topic
          consumer:
             maxAttempts: 3
             backOffInitialInterval: 5000
             backOffMultiplier: 2

【问题讨论】:

    标签: spring-boot apache-kafka kafka-consumer-api spring-cloud-dataflow spring-retry


    【解决方案1】:

    这对我来说很好......

    @SpringBootApplication
    @EnableBinding(Inputs.class)
    public class So57522645Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So57522645Application.class, args);
        }
    
        @StreamListener("input1")
        public void listen1(String in) {
            System.out.println("main: " + in);
            throw new RuntimeException("fail");
        }
    
        @StreamListener("input2")
        public void listen2(String in) {
            System.out.println("error: " + in);
            throw new RuntimeException("fail");
        }
    
        @StreamListener("input3")
        public void listen3(String in) {
            System.out.println("final: " + in);
        }
    
        @Bean
        public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
            return args -> template.send("main", "test".getBytes());
        }
    
    }
    
    interface Inputs {
    
        @Input
        MessageChannel input1();
    
        @Input
        MessageChannel input2();
    
        @Input
        MessageChannel input3();
    
    }
    
    spring:
      cloud:
        stream:
          bindings:
            input1:
              consumer:
                max-attempts: 1
              destination: main
              group: grp1
            input2:
              consumer:
                max-attempts: 3
              destination: error.main.grp1
              group: grp2
            input3:
              destination: error.error.main.grp1.grp2
              group: grp3
          kafka:
            bindings:
              input1:
                consumer:
                  enable-dlq: true
              input2:
                consumer:
                  enable-dlq: true
    

    main: test
    error: test
    error: test
    error: test
    final: test
    

    【讨论】:

      【解决方案2】:

      每个 spring 文档 - https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/_configuration_options.html,maxAttempts 配置应该在

      “spring.cloud.stream.bindings..consumer.”

      在您的配置中,它看起来像在

      “spring.cloud.stream.kafka.bindings..consumer.”

      MaxAttempts 似乎不是 kafka 绑定道具的有效配置 - https://github.com/spring-cloud/spring-cloud-stream-binder-kafka

      【讨论】:

      • 没错; maxAttempts 在所有活页夹中都很常见。
      • @Nitin 感谢您指出这一点。这是我在这里提出要问的问题时犯的错误。我现在已经用我原来的配置更新了这个问题。
      • @GaryRussell 我不明白你在说什么。这是否意味着我们不能为多个活页夹多次重试maxAttempts?如果否,我们可以尝试其他方法吗?非常感谢您的宝贵时间。
      • 否;这根本不是我的意思。有适用于所有活页夹的通用绑定属性(例如maxAttempts)和特定于活页夹的绑定属性。每个绑定可以有不同的属性值。看我的回答;你的场景对我来说很好。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2020-01-02
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-06-21
      • 2023-04-11
      相关资源
      最近更新 更多