【问题标题】:How can understand if event is sent successfully to consumers?如何知道事件是否成功发送给消费者?
【发布时间】:2021-07-05 17:10:35
【问题描述】:

我有一个使用 Spring Stream Kafka 发布事件的主要服务,我有 2 个不同的服务使用这个事件。这些服务应该使用事件来完成这个过程。

Order Service(Publisher-OrderEvent) -- Stock Service(Listener-OrderEvent) -- Payment Service(Listener-OrderEvent)

(应完成库存检查和付款以完成订单)

如何才能知道这个事件是否从order service成功发送到这两个service呢? 我需要知道其中一个是否已关闭。如果它关闭/其中一项服务无法收到事件,我需要取消订单服务。 你们有这个案例的示例实现吗?

提前谢谢你。

【问题讨论】:

  • 是时候了解“最终一致性”了。
  • 如果该服务器再次启动时收到事件,我没问题,但它也没有收到旧的/未处理的事件.. 等待新事件
  • 那你的服务器订阅主题配置错误。
  • 你能分享一下实现的例子吗

标签: java spring-boot apache-kafka apache-kafka-streams spring-cloud-stream


【解决方案1】:

选项 1:使用指标和records-consumed-total

从消费者端,监控这个属性/metric with the client id

kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}"

查看records-consumed-total 属性的内部,一旦收到它就会增加。


选项 2:使用 回调

您也可以使用call back 来验证它是否已发送

"...完全非阻塞使用可以利用 Callback 参数提供一个回调,请求完成时将调用该回调"

 ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
 producer.send(myRecord,
               new Callback() {
                   public void onCompletion(RecordMetadata metadata, Exception e) {
                       if(e != null) {
                          e.printStackTrace();
                       } else {
                          //... your code message was received!!
                          System.out.println("The offset of the record we just sent is: " + metadata.offset());
                       }
                   }
               });

【讨论】:

  • 选项 2 不保证任何消费者都会阅读它。选项 3 - 消费者组滞后监控
猜你喜欢
  • 2013-08-19
  • 2012-10-17
  • 2017-03-31
  • 1970-01-01
  • 2016-04-26
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多