【问题标题】:per message acknowledgement in Kafka / RabbitMQKafka / RabbitMQ 中的每条消息确认
【发布时间】:2019-03-02 21:42:49
【问题描述】:

我们有一个可以工作的 rabbitmq .implementation ,由于量大,我们计划切换到 kafka。

我有一个疑问。

在 rabbitMQ 中,当消费者从 Q 消费消息时,消息进入不同的阶段,即未确认阶段。客户端/消费者需要一些时间来处理消息,在成功处理后,它会向 Q 发送确认消息,并且消息会从 Q 中删除。如果不成功,在定义的时间段后,如果 Q 没有得到确认,则消息是附加在 Q 的末尾。这样我们就不会丢失任何消息。

根据我对 Kafka 的了解,我知道如果例如消息 100 未成功处理,则偏移量不会增加,但如果消息 101 处理成功,则偏移量会增加。所以我丢失了消息 100。

有没有办法保证所有消息都不会丢失。

【问题讨论】:

  • 您必须实现死信队列 (DLQ)。
  • 为什么不将 Solace 视为更接近 Rabbit 的消息传递替代品?类似的 API + 语义(例如每个消息的 ACK),但更好的卷处理。

标签: apache-kafka rabbitmq producer-consumer


【解决方案1】:

Kafka 不会从主题中删除消息,除非它到达 log.retention.bytes log.retention.hours log.retention.minutes log.retention.ms 配置之一。因此,如果偏移量增加,您不会丢失以前的消息,您可以简单地将偏移量更改为您想要的位置。

【讨论】:

  • @admin ,我的消费者没有跟踪,我们有数百个客户端,正在消费 RabbitMQ 消息。他们消费并确认每条消息。 RabitMQ 负责删除成功消费的消息,并为不成功的消息再次发布回 Q。现在,如果客户端的偏移量在没有处理消息的情况下向前移动,客户端如何再次使用相同的消息,而不在客户端进行更改。
【解决方案2】:

我也遇到了同样的问题。如果我想简单地说,RabbitMQ 会记录每个

  1. 已发布但未使用
  2. 已发布、已使用和未确认的消息。

卡夫卡没有,所以你不能把它做好,你必须自己实现它。

虽然有选项,使用kmq,性能会低于50%,看看

https://softwaremill.com/kafka-with-selective-acknowledgments-performance/

【讨论】:

    【解决方案3】:

    除非您轮询新消息,否则您的消息偏移量不会增加。因此,您必须担心重新处理您的消息。

    如果您想将数据处理的结果存储到Kafka集群中,您可以使用transaction feature of Kafka。这样您就可以支持一次交付。您的所有更改都将被保存,否则不会存储任何更改。

    另一种方法是使您的处理方案具有幂等性。您将为 Kafka 中的每条消息分配一个唯一 ID。处理消息时,将 ID 存储在数据库中。崩溃后,您可以通过查看数据库来检查您的消息 ID 是否已被处理。

    【讨论】:

      【解决方案4】:

      您应该阅读一些关于 Kafka 中的消息消费如何工作的内容。以下是 Kafka 官方文档消费者部分的链接:https://kafka.apache.org/documentation/#theconsumer

      基本上,在 Kafka 中,消息只有在经过足够的时间后才会被删除,就像 @Amin 所说的那样,使用 log.retention.hourslog.retention.minuteslog.retention.ms 进行配置。

      在 Kafka 中,任何数量的消费者都可以在任何时候开始消费来自任何主题的消息,而不管其他消费者是否已经从同一个主题消费。 Kafka 使用存储在 Kafka 本身中的偏移量来跟踪每个消费者在每个主题/分区上的位置。因此,如果您的消费者需要使用消息 100,就像您在问题中描述的那样,您可以简单地“倒回”到所需的消息,然后再次开始正常消费。无论您之前是否使用过它,或者其他消费者是否正在阅读该主题,都无关紧要。

      来自 Kafka 官方文档:

      消费者可以故意回退到旧的偏移量并 重新消费数据。这违反了队列的通用合同,但是 事实证明,对于许多消费者来说,这是一项必不可少的功能。例如, 如果消费者代码有错误并且在一些消息后被发现 被消费了,消费者可以在出现错误后重新消费那些消息 是固定的。

      【讨论】:

      • 谢谢@mjuarez,我的场景有点不同,让我解释一下,所以我每个主题只有一个消费者/读者,如果我有多个,它们是为了并行,所以他们会阅读/消费不同的消息。但是消费者/阅读者在处理消息失败并转发到下一个消息时,可能消息仍会保留在kafka中,但它会不断增加偏移量,只读取新消息。所以失败的消息将不会被处理再次 。对于 rabbitMq 这很容易,因为它将消息移回 Q 。 Kafka 为每个消费者保留一个偏移量,RabbitMQ 每条消息。
      • 只需配置客户端一次读取 1 条 kafka 消息,并且仅在成功处理每条消息后提交偏移量,然后您将具有与 rabbitmq 类似的行为(仅在使用主题分区进行分片时更快和多个消费者在一个共同的消费者组中)它会比Kafka客户端批量消费消息的普通模式慢,但仍然更快
      • 另外,kafka 客户端通常为每个消费者组的每个分区保留一个偏移量,因此在单个消费者组中具有 16 个分区和 4 个消费者的主题将保留 16 个偏移量(每个分区一个)。
      • @Hans 这是一个可行的解决方案,但最终会使进程变慢。有没有其他选择,否则我们必须坚持使用旧的 RabbitMq
      • 另一种方法是轮询成批的 Kafka 消息并管理自己的偏移量。如果您必须跳过一条消息,则将其偏移量写入死消息队列或外部非 Kafka 系统以供以后重新处理。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2015-11-24
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-06-21
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多