【问题标题】:Spring Cloud Stream Kafka - Eventual consistency - Does Kafka auto retry unacknowledged messages (when using autocommitoffset=false)Spring Cloud Stream Kafka - 最终一致性 - Kafka 是否自动重试未确认的消息(使用 autocommitoffset=false 时)
【发布时间】:2017-07-03 00:24:33
【问题描述】:

实施最终一致的分布式架构已被证明是一件痛苦的事。有大量博客文章讲述了如何做到这一点,但没有展示(代码)如何实际做到这一点。

我遇到的一个方面是在消息未得到确认时必须手动重试消息。

例如:我的订单服务向 Kafka 发送支付事件。支付服务订阅它并处理它,回答支付正常或支付失败

  1. 要求付款:Order Service ----Pay event----> Kafka ----Pay Event ----> Payment Service

  2. 付款正常:-> Payment Service ----Payment ok event ----> Kafka ----Payment ok Event ----> Order Service

  3. 支付失败 -> Payment Service ----Payment failure event ----> Kafka ----Payment failure Event ----> Order Service

重点是:

我确定何时使用同步发送将消息传递到 Kafka。但是,我必须知道付款服务已处理付款的唯一方法是期待一个应答事件(付款正常|付款失败)。

这迫使我在订单服务器中实现重试机制。如果在一段时间内没有得到答复,请使用新的 Pay 事件重试。

更重要的是,这也迫使我在支付服务中处理重复的消息,以防它们被实际处理但没有得到订单服务的答案。

我想知道如果消费者没有确认消息的新偏移量,Kafka 是否有内置机制来发送重试

在 Spring Cloud Stream 中,我们可以将 autoCommitOffset 属性设置为 false 并处理消费者中偏移量的确认:

 @StreamListener(Sink.INPUT)
 public void process(Message<?> message) {
     Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
     if (acknowledgment != null) {
         System.out.println("Acknowledgment provided");
         acknowledgment.acknowledge();
     }
 }

如果我们不执行acknowledgment.acknowledge();会发生什么情况,消息会被Kafka自动重发给这个消费者吗?

如果可能的话,我们不再需要手动重试,可以做这样的事情:

付款服务:

 @Autowired
 private PaymentBusiness paymentBusiness;

 @StreamListener(Sink.INPUT)
 public void process(Order order) {
     Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
     if (acknowledgment != null) {
         paymentBusiness(order);            
         //If we don't get here because of an exception 
         //Kafka would retry...
         acknowledgment.acknowledge();
     }
 }

如果可以的话,Kafka 中的重试周期是如何配置的?

在最坏的情况下(也是最有可能的情况),这不受支持,我们必须手动重试。您知道 Spring Cloud Stream 应用程序使用 Kafka 处理最终一致性的任何真实示例吗?

【问题讨论】:

    标签: spring-cloud microservices distributed-transactions spring-cloud-stream spring-kafka


    【解决方案1】:

    如果我们不执行 acknowledgement.acknowledge(); 会发生什么消息会被 Kafka 自动重新发送给这个消费者吗?

    没有。只要客户端打开,Kafka 消费者就会按顺序读取消息。 Kafka 不支持更复杂的确认模式,例如单个消息确认,仅更新给定消费者组和分区主题的偏移量。 Spring Cloud Stream 支持手动确认 Spring Cloud Stream 中的消息,用于异步处理它们的场景(从而防止消息丢失) - 但假设一旦手动确认消息,它的偏移量就会被保存,因此所有先前的消息都来自同一个partition-topic 将被视为“已读”。如果您想挑选出失败的消息,您可以使用 DLQ 支持 - 并让后续消费者接收它们。重新启动消费者将从上次保存的偏移量继续读取,因此您可以选择不为一系列未成功处理的消息保存偏移量。

    Spring Cloud Stream 消费者具有内置的重试和 DLQ 支持 - 请参阅 http://docs.spring.io/spring-cloud-stream/docs/Brooklyn.SR2/reference/htmlsingle/#_kafka_consumer_properties 中的 enableDlq 以及作为默认消费者属性的一部分提供的重试设置:http://docs.spring.io/spring-cloud-stream/docs/Brooklyn.SR2/reference/htmlsingle/#_consumer_properties

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-07-18
      • 1970-01-01
      • 2018-03-09
      • 2021-06-12
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多