【问题标题】:Reset the JDBC Kafka Connector to start pulling rows from the beginning of time?重置 JDBC Kafka 连接器以从一开始就开始拉行?
【发布时间】:2017-08-17 16:12:43
【问题描述】:

Kafka 连接器可以利用主键和时间戳来确定需要处理哪些行。

我正在寻找一种方法来重置连接器,以便它从一开始就处理。

【问题讨论】:

  • 这是在分布式模式下运行还是独立运行?在独立中,您可以删除我相信的偏移文件。在分布式模式下,最简单的做法可能是更改连接器名称。无论哪种方式,如果您只是为了您的意识而这样做,就会将重复的数据发送到 Kafka。
  • 它将以分布式模式运行,因为它需要一个大型集群来处理我们需要连接的所有数据库。是的,这是有道理的。我只是想了解如何解决数据问题。

标签: sql-server apache-kafka apache-kafka-connect


【解决方案1】:

因为要求是在分布式模式下运行,所以最简单的做法是将连接器名称更新为新值。这将提示在 connect-offsets 主题中添加一个新条目,因为它看起来像一个全新的连接器。然后连接器应该再次开始读取,就好像还没有向 Kafka 写入任何内容一样。您还可以手动向与该特定连接器关联的连接偏移主题中的键发送一条墓碑消息,但重命名比处理它要容易得多。此方法适用于所有源连接器,而不仅仅是此处描述的 JDBC。

【讨论】:

  • 即使我有类似的要求,我也需要从某个时间重新处理 binlog 消息,例如将偏移量移动到上周。我们能做到这一点吗?我正在使用 Debezium 连接器。
  • 难道不能将连接器配置更新为批量模式,让它运行,然后再改回时间戳模式吗?
  • 是的。可以从批量模式切换到时间戳模式更新工作配置。但是,对于连接器而言,这种更改就像是完成了一项新任务。
【解决方案2】:

我有点厌倦了在开发过程中每次重命名连接器,所以开始使用 tombstone 方法。此方法可用于任何源连接器。

首先检查连接器的key/value的格式:

kafka-console-consumer --bootstrap-server localhost:9092 --topic kafka-connect-offsets --from-beginning --property print.key=true
["demo",{"query":"query"}] {"timestamp_nanos":542000000,"timestamp":1535768081542}
["demo",{"query":"query"}] {"timestamp_nanos":171831000,"timestamp":1540435281171}
["demo",{"query":"query"}] {"timestamp_nanos":267775000,"timestamp":1579522539267}

通过发送不带任何值的密钥来创建墓碑消息:

echo '["demo",{"query":"query"}]#' | kafka-console-producer --bootstrap-server localhost:9092 --topic kafka-connect-offsets --property "parse.key=true" --property "key.separator=#"

现在重新启动或重新创建连接器,它将再次开始生成消息。

除非您真的知道自己在做什么,否则在生产中要非常小心。这里有更多信息:https://rmoff.net/2019/08/15/reset-kafka-connect-source-connector-offsets/

【讨论】:

    猜你喜欢
    • 2022-01-13
    • 2022-10-04
    • 2019-08-18
    • 1970-01-01
    • 2021-10-24
    • 1970-01-01
    • 2015-05-09
    • 2015-04-18
    • 2021-01-11
    相关资源
    最近更新 更多