【问题标题】:Avoid Data Loss While Processing Messages from Kafka处理来自 Kafka 的消息时避免数据丢失
【发布时间】:2021-01-22 23:39:10
【问题描述】:

寻找设计我的 Kafka Consumer 的最佳方法。基本上我想看看什么是避免数据丢失的最佳方法,以防万一 处理消息期间的异常/错误。

我的用例如下。

a) 我使用 SERVICE 处理消息的原因是 - 将来我计划编写一个 ERROR PROCESSOR 应用程序,该应用程序将在一天结束时运行,它将尝试处理失败的消息(不是所有消息,但由于缺少父项等任何依赖关系而失败的消息)再次。

b) 我想确保消息丢失为零,因此我会将消息保存到文件中,以防在将消息保存到数据库时出现任何问题。

c) 在生产环境中,可能有多个消费者和服务实例正在运行,因此多个应用程序很有可能尝试写入 同一个文件。

Q-1) 写入文件是避免数据丢失的唯一选择吗?

Q-2) 如果是唯一选项,如何确保多个应用程序写入同一个文件并同时读取?请考虑将来一旦错误处理器 正在构建,它可能正在从同一个文件中读取消息,而另一个应用程序正在尝试写入该文件。

错误处理器 - 我们的消息来源遵循事件驱动机制,有时依赖事件(例如,某事物的父实体)很有可能会延迟几天.所以在这种情况下,我希望我的错误处理器多次处理相同的消息。

【问题讨论】:

  • 如果您使用 spark 来消费消息,那么您可以查看 spark 检查点。

标签: apache-kafka kafka-consumer-api file-writing


【解决方案1】:

我以前遇到过类似的事情。所以,直接进入你的问题:

  • 不一定,您也许可以在新主题中将这些消息发送回 Kafka(比方说 - error-topic)。因此,当您的错误处理器准备就绪时,它可以侦听 this error-topic 并在这些消息进入时使用它们。

  • 我认为已针对第一个问题解决了这个问题。因此,与其使用文件来写入和读取并同时打开多个文件句柄来执行此操作,Kafka 可能是更好的选择,因为它专为此类问题而设计。

注意:以下几点只是基于我对您的问题领域的有限理解的一些思考。因此,您可以选择安全地忽略它。

service 组件的设计中还有一点值得考虑 - 您不妨考虑通过将所有错误消息发送回 Kafka 来合并第 4 点和第 5 点。这将使您能够以一致的方式处理所有错误消息,而不是将一些消息放在错误数据库中,而将一些消息放在 Kafka 中。

编辑:根据错误处理器要求的附加信息,这里是解决方案设计的图解。

为了保持通用性,我现在特意保留了 ERROR PROCESSOR 抽象的输出。

我希望这会有所帮助!

【讨论】:

  • Lalit,我最初也在考虑类似的问题,但在我的情况下,我不希望所有失败的消息都再次被错误处理器重新处理。理想情况下,我的 ERROR 处理器会提醒生产应用程序,如果有任何消息由于验证问题/解析错误而失败。并且它应该重新处理由于依赖问题而失败的消息(例如,可能是依赖实体迟到了从源系统到达)。另外,我预计将来会有很多类似的用例,所以希望错误处理器是通用的。
  • 好的。您是说您的 ERROR PROCESSOR 将位于您的 SERVICE 组件之后?并且每条消息都需要经过 SERVICE 中的处理,然后才能输入到 ERROR PROCESSOR?
  • 如果 ERRPR PROCESSOR 和 SERVICE 可以并行放置,那么您仍然可以通过使 ERROR PROCESSOR 成为从该错误主题中消费的独立微服务来实现通用功能。如果您打算在将来阅读各种错误,那么可以在消息头或分区级别中进行区分,这将使错误处理器能够以不同的方式处理它们。
  • 我已经更新了我的问题以包含错误处理器的要求。我正在寻找处理错误的最佳选择 - 我想确保我身边没有遗漏任何消息。不确定移动到数据库或移动到错误主题是最佳选择。如果我们使用 ERROR TOPIC 选项,将来,ERROR PROCESSOR 可能必须处理来自 20 多个 ERROR 主题的消息;而且,消息需要被处理器多次处理;
  • 好吧,您不需要创建 20 多个主题。您可以改为在同一主题中编写所有类型的错误。然后,让错误消息具有处理所有这些的逻辑。我认为这将是一个更简洁的设计,即服务接收消息,处理它们并将它们发送到数据库,所有错误消息都转到错误主题,错误处理器处理它们。我相信这将适合您的事件驱动架构,因为 Kafka 专为此类用例而设计。
【解决方案2】:

如果您在写入数据库之前没有提交消费的消息,那么在 Kafka 保留消息时不会丢失任何内容。这样做的代价是,如果消费者确实提交了数据库,但 Kafka 偏移提交失败或超时,您最终将再次使用记录,并且可能在您的服务中处理重复项。

即使您确实写入了文件,也无法保证您的顺序,除非您为每个分区打开一个文件,并确保所有消费者只在一台机器上运行(因为您在那里保留状态,这不是容错)。重复数据删除仍然需要处理。

此外,您可以查看 Kafka Connect 框架,而不是将您自己的消费者写入数据库。为了验证消息,您可以类似地部署一个 Kafka Streams 应用程序来过滤掉来自输入主题的不良消息,并将其发送到一个主题以发送到数据库

【讨论】:

  • 我没有将偏移量提交给 DB,我依靠 Kafka 提交来跟踪消息。此外,在我的情况下,排序并不重要。但我将探索 kafka 流,但请你用一些例子更新答案。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2019-06-07
  • 2022-08-16
  • 1970-01-01
  • 2014-10-20
  • 2014-03-24
  • 2021-04-27
  • 2020-09-19
相关资源
最近更新 更多