【问题标题】:Is message deduplication essential on the Kafka consumer side?消息重复数据删除在 ​​Kafka 消费者端是否必不可少?
【发布时间】:2021-12-31 14:43:47
【问题描述】:

Kafka documentation 声明以下是最重要的场景:

实时处理付款和财务交易,例如 在证券交易所、银行和保险业

此外,关于主要概念,就在最顶部:

Kafka提供处理能力等各种保证 事件恰好一次

文件说的很有趣:

许多系统声称提供“恰好一次”交付语义,但 阅读细则很重要,这些声明中的大多数都是 误导

付款/金融交易必须“精确一次”处理似乎很明显,但 Kafka 文档的其余部分并没有明确说明应该如何完成。 让我们关注生产者/发布者方面:

如果生产者尝试发布消息并遇到网络 错误它不能确定这个错误是发生在之前还是之后 消息已提交。这类似于插入的语义 使用自动生成的键进入数据库表。 … 自 0.11.0.0 起, Kafka 生产者还支持幂等交付选项, 保证重新发送不会导致重复条目 记录。

KafkaProducer 仅确保它不会错误地重新提交消息(导致重复)本身。 Kafka 无法涵盖客户端应用程序代码崩溃(以及 KafkaProducer)的情况,并且不确定它之前是否调用了 send(或事务生产者的情况下的 commitTransaction),这意味着应用程序级重试将导致重复处理。

对于其他目的地系统,通常只进行一次交付 需要与此类系统合作,但 Kafka 提供了偏移量 这使得实现这一点变得可行(另请参阅 Kafka Connect)。

上述陈述只是部分正确,这意味着虽然它在消费者端公开了偏移量,但在生产者端完全不可行。

Kafka 消费-过程-生产循环利用 sendOffsetsToTransaction 启用了一次性处理,但同样无法涵盖链中第一个生产者可能出现重复的情况。

官方提供的demo for EOS(Exactly once 语义)只提供了一个consume-process-produce EOS的例子。

涉及读取已提交事务的 DB 事务日志读取器的解决方案也无法确定它们是否会在崩溃时产生重复消息。

不支持涉及数据库和 Kafka 生产者的分布式事务 (XA)。


所有这一切是否意味着为了确保对支付和金融交易进行一次处理(Kafka 顶级用例!),我们绝对必须在消费者端执行业务级消息重复数据删除,尽管有 Kafka 传输级别的“保证”/索赔?

注意:我知道: Kafka Idempotent producer 但如果重复数据删除在消费者方面是不可避免的,我想要一个明确的答案。

【问题讨论】:

  • 请将您的帖子限制在单个问题或特定问题上,并且您链接到错误的演示 github.com/apache/kafka/blob/2.5/examples/src/main/java/kafka/…
  • 感谢您的关注!您是否建议我删除所有“它是否正确”和“官方示例”问题,只剩下最后一个问题?我链接到仅由 1 个消费者和 1 个生产者组成的演示,因为我对端到端 EOS 感兴趣。您提出的演示演示了消费过程生产,其中 ExactlyOnceMessageProcessor 已插入消费者和生产者之间。该演示再次没有演示如何防止生产者端重复,因此如果我们必须在消费者端执行重复数据删除,它无助于具体回答。
  • 据我所知,事务支持是客户端内部的,而且它每批只支持一次。如果您必须在整个应用程序生命周期中防止重复,在重新启动时,Kafka 无法控制。我记得以前的一个项目,我认为财务团队使用 Mongo 或 Hbase 来执行两阶段提交,而不会对 Kafka 重复
  • 而链接的demo只是普通的消费者和生产者,没有交易
  • kafka 文档中关于“Exactly once”处理不同阶段(如生产者、消费者、侧消费者的业务逻辑)的含义。例如:消息基础设施只传递一次,但消费者中的业务逻辑可能会因为某些问题或合法原因而对其进行两次处理。

标签: apache-kafka duplicates kafka-producer-api payment-processing


【解决方案1】:

您必须在消费者端进行重复数据删除,因为消费者端的重新平衡确实会导致根据获取大小和提交间隔参数在消费者组中多次处理事件。

如果消费者在未向代理确认的情况下退出,Kafka 会将这些事件分配给组中的另一个消费者。例如,如果您要提取 5 个事件的批量大小,如果消费者在处理前 3 个事件后死亡或重新启动(如果外部 api/db 失败,或者更糟糕的情况是您的服务器内存不足并崩溃),当前消费者死亡突然没有向经纪人提交/确认。因此,相同的批次从组(重新平衡)分配给另一个消费者,它开始再次提供相同的事件批次,这将导致重新处理相同的记录集,从而导致重复。在这里阅读:https://quarkus.io/blog/kafka-commit-strategies/

您可以使用 Kafka 的内部状态存储进行重复数据删除。这里没有偏移/分区跟踪,它是一种缓存(集群上的持久时间限制)。

在我的例子中,我们在成功处理事件时将相关 ID(传入事件中的唯一业务标识符)推送到其中,并且在处理之前检查所有新事件以确保其不是重复事件。启用状态存储将在 Kafka 集群中创建更多内部主题,仅供参考。

https://kafka.apache.org/10/documentation/streams/developer-guide/processor-api.html#state-stores

【讨论】:

  • 有趣 - 虽然我预计重复的原因可能是由于生产者端的设计,但您指出即使是消费者端也可能导致重复处理。您能否为第一段中的陈述提供参考/编辑答案?
  • 更新答案。
猜你喜欢
  • 2020-10-16
  • 1970-01-01
  • 2017-11-09
  • 1970-01-01
  • 1970-01-01
  • 2020-01-06
  • 1970-01-01
  • 1970-01-01
  • 2015-04-19
相关资源
最近更新 更多