【发布时间】:2017-07-03 00:24:33
【问题描述】:
实施最终一致的分布式架构已被证明是一件痛苦的事。有大量博客文章讲述了如何做到这一点,但没有展示(代码)如何实际做到这一点。
我遇到的一个方面是在消息未得到确认时必须手动重试消息。
例如:我的订单服务向 Kafka 发送支付事件。支付服务订阅它并处理它,回答支付正常或支付失败
要求付款:
Order Service ----Pay event----> Kafka ----Pay Event ----> Payment Service付款正常:->
Payment Service ----Payment ok event ----> Kafka ----Payment ok Event ----> Order Service支付失败 ->
Payment Service ----Payment failure event ----> Kafka ----Payment failure Event ----> Order Service
重点是:
我确定何时使用同步发送将消息传递到 Kafka。但是,我必须知道付款服务已处理付款的唯一方法是期待一个应答事件(付款正常|付款失败)。
这迫使我在订单服务器中实现重试机制。如果在一段时间内没有得到答复,请使用新的 Pay 事件重试。
更重要的是,这也迫使我在支付服务中处理重复的消息,以防它们被实际处理但没有得到订单服务的答案。
我想知道如果消费者没有确认消息的新偏移量,Kafka 是否有内置机制来发送重试。
在 Spring Cloud Stream 中,我们可以将 autoCommitOffset 属性设置为 false 并处理消费者中偏移量的确认:
@StreamListener(Sink.INPUT)
public void process(Message<?> message) {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
}
如果我们不执行acknowledgment.acknowledge();会发生什么情况,消息会被Kafka自动重发给这个消费者吗?
如果可能的话,我们不再需要手动重试,可以做这样的事情:
付款服务:
@Autowired
private PaymentBusiness paymentBusiness;
@StreamListener(Sink.INPUT)
public void process(Order order) {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
paymentBusiness(order);
//If we don't get here because of an exception
//Kafka would retry...
acknowledgment.acknowledge();
}
}
如果可以的话,Kafka 中的重试周期是如何配置的?
在最坏的情况下(也是最有可能的情况),这不受支持,我们必须手动重试。您知道 Spring Cloud Stream 应用程序使用 Kafka 处理最终一致性的任何真实示例吗?
【问题讨论】:
标签: spring-cloud microservices distributed-transactions spring-cloud-stream spring-kafka