【问题标题】:KafkaIO checkpoint - how to commit offsets to KafkaKafkaIO 检查点 - 如何向 Kafka 提交偏移量
【发布时间】:2018-07-02 13:04:05
【问题描述】:

我正在使用 Google Dataflow 中的 Beam KafkaIO 源运行作业,但找不到一种简单的方法来在作业重新启动时保持偏移量(作业更新选项不够,我需要重新启动作业)

将 Beam 的 KafkaIO 与 PubSubIO 进行比较(或者更准确地说,将 PubsubCheckpoint 与 KafkaCheckpointMark 进行比较)我可以看到检查点持久性在 KafkaIO 中没有实现(KafkaCheckpointMark.finalizeCheckpoint 方法为空),而它在 PubsubCheckpoint.finalizeCheckpoint 中实现,它确实对 PubSub 进行了确认。

这是否意味着我无法在作业重新启动时以最小的努力可靠地管理 Kafka 偏移量?

到目前为止我考虑过的选项:

  1. 实现我自己的持久偏移逻辑 - 听起来很复杂,我在 Scala 中通过 Scio 使用 Beam。

  2. 什么都不做,但那会导致作业重新启动时出现许多重复(主题有 30 天的保留期)。

  3. 启用自动提交,但这会导致消息丢失,更糟糕的是。

【问题讨论】:

  • 你最好的办法是试试看。 SO 不是寻求设计帮助的好地方。

标签: scala apache-kafka google-cloud-dataflow apache-beam


【解决方案1】:

有两个选项:在 KafkaIO 中启用 commitOffsetsInFinalize() 或在 Kafka 消费者配置中启用自动提交。请注意,虽然commitOffsetsInFinalize() 与 Beam 中处理的内容比 Kafka 的自动提交更同步,但它并不能提供强有力的保证一次性处理。想象一个两阶段的管道,Dataflow 在第一阶段之后完成 Kafka 阅读器,而无需等待第二阶段完成。如果您当时从头开始重新启动管道,您将不会处理已完成第一阶段但尚未被第二阶段处理的记录。 PubsubIO 的问题也不例外。

Regd 选项 (2) :您可以将 KafkaIO 配置为从特定时间戳开始读取(假设 Kafka 服务器支持它(版本 10+))。但看起来并不比启用 auto_commit 更好。

也就是说,KafkaIO 应该支持 finalize。使用起来可能比启用 auto_commit 更简单(需要考虑频率等)。我们没有很多用户要求它。如果可以,请在 user@beam.apache.org 上提及。

[更新:我在 PR 4481 中添加了对向 KafkaCheckpointMark 提交偏移量的支持]

【讨论】:

  • 我想根据最后一批保存到数据库的偏移量来实现我自己的偏移量处理程序,并禁用 ceckpointing。我可以在 KafkaIO 中使用 ConsumenrFactoryFn,然后使用 seek (partion,offset) 在其序列化函数中保存最后一批的 DB 偏移量吗?
  • 感谢您的解释。它帮助了我。然而,在 PR 中有这样的声明“但它不提供硬处理保证。在调用 CheckpointMark#finalizeCheckpoint() 后提交可能会有短暂的延迟,因为读取器可能会在从 Kafka 读取时被阻止。”所以,如果我理解正确,不能保证避免重复,但可以保证记录不会被正确删除(即保证不存在间隙)?
  • @Pradyumna,是的。仍然有可能获得一小部分重复项(通常在工作人员重新启动时),但不应有任何间隙。
猜你喜欢
  • 2017-06-24
  • 1970-01-01
  • 2020-01-27
  • 1970-01-01
  • 2016-06-01
  • 2021-11-14
  • 1970-01-01
  • 2017-03-17
  • 1970-01-01
相关资源
最近更新 更多