【问题标题】:Using Kafka Connect HOWTO "commit offsets" as soon as a "put" is completed in SinkTask在 SinkTask 中完成“放置”后立即使用 Kafka Connect HOWTO“提交偏移量”
【发布时间】:2017-12-25 07:36:31
【问题描述】:

我正在使用 Kafka Connect 从 Kafka 代理 (v0.10.2) 获取消息,然后将其同步到下游服务。

目前,我在SinkTask#put 中有代码,它将处理SinkRecord,然后将其持久化到下游服务。

几个关键要求,

  1. 我们需要确保消息至少持久化到下游服务一次。
  2. 如果下游服务抛出错误或说它没有处理消息,那么我们需要确保再次重新读取消息。

所以我们认为我们可以依靠 SinkTask#flush 通过抛出异常或告诉 Connect 不要提交偏移量但在下一个重试的东西来有效地退出提交接收消息的特定轮询/周期的偏移量投票。

但正如我们发现 flush 实际上是基于时间的,并且或多或少独立于民意调查,它会在达到特定时间阈值时提交偏移量。

在 0.10.2 中引入了 SinkTask#preCommit,因此我们认为可以将其用于我们的目的。但是文档中没有提到SinkTask#putSinkTask#preCommit 之间存在1:1 的关系。

因为本质上我们希望commit offsets 尽快成为一个put succeeds。同样,如果特定的 put 失败,提交偏移量。

如果不是通过SinkTask#preCommit,如何做到这一点?

【问题讨论】:

    标签: apache-kafka offset flush apache-kafka-connect


    【解决方案1】:

    正确地将数据传入和传出 Kafka 可能具有挑战性,而 Kafka Connect 使这变得更容易,因为它使用了最佳实践并隐藏了许多复杂性。对于接收器连接器,Kafka Connect 从主题读取消息,将它们发送到您的连接器,然后定期提交已读取和处理的各种主题分区的最大偏移量。

    请注意,“将它们发送到您的连接器”对应于 put(Collection<SinkRecord>) 方法,并且在 Kafka Connect 提交偏移量之前可能会多次调用该方法。您可以控制 Kafka Connect 提交偏移量的频率,但 Kafka Connect 确保仅当连接器成功处理该消息时,它才会提交该消息的偏移量。

    当连接器名义上运行时,一切都很好,并且您的连接器会看到每条消息一次,即使定期提交偏移量也是如此。但是,如果连接器失败,那么当它重新启动时,连接器将从 最后提交的偏移量处开始。这可能意味着您的连接器会看到一些它在崩溃前处理的相同消息。如果您仔细编写连接器以使其具有至少一次语义,这通常不是问题。

    为什么 Kafka Connect定期而不是每条记录提交偏移量?因为它节省了很多工作,并且在名义上什么时候进行并不重要。只有当出现问题时,偏移滞后才重要。即便如此,如果您让 Kafka Connect 处理偏移量,您的连接器需要准备好处理消息至少一次。一次是可能的,但您的连接器必须做更多的工作(见下文)。

    写作记录

    您在编写连接器时有很大的灵活性,这很好,因为很大程度上取决于它正在写入的外部系统的功能。让我们看看实现putflush的不同方式。

    如果系统支持事务或可以处理一批更新,则连接器的put(Collection<SinkRecord>) 可以使用单个事务/批处理写入该集合中的所有记录,并根据需要重试多次,直到事务/批处理完成或在最终抛出错误之前。在这种情况下,put 完成所有工作,要么成功,要么失败。如果成功,则 Kafka Connect 知道所有记录都已正确处理,因此可以(在某个时候)提交偏移量。如果您的 put 调用失败,则 Kafka Connect 假定不知道是否处理了任何记录,因此它不会更新其偏移量并停止您的连接器。您的连接器的 flush(...) 不需要做任何事情,因为 Kafka Connect 正在处理所有偏移量。

    如果系统不支持事务,而您只能一次提交一个,您可能会让连接器的put(Collection<SinkRecord>) 尝试单独写出每条记录,直到它成功并在抛出错误之前根据需要重试。同样,put 完成所有工作,flush 方法可能不需要做任何事情。

    到目前为止,我的示例完成了put 中的所有工作。您始终可以选择让put 简单地缓冲记录,然后在flushpreCommit 中完成写入外部服务 的所有工作。您可能会这样做的一个原因是您的写入是基于时间的,就像flushpreCommit。如果您不希望您的写入是基于时间的,您可能不想在flushpreCommit 中进行写入。

    记录偏移或不记录

    如上所述,默认情况下,Kafka Connect 会定期记录偏移量,以便在重新启动时连接器可以从上次停止的地方开始。

    但是,有时需要连接器在外部系统中记录偏移量,尤其是当这可以自动完成时。当这样的连接器启动时,它可以在外部系统中查找最后写入的偏移量,然后可以告诉 Kafka Connect 它想从哪里开始读取。使用这种方法,您的连接器可能能够只进行一次消息处理。

    当接收连接器执行此操作时,它们实际上根本不需要 Kafka Connect 来提交任何偏移量。 flush 方法只是让您的连接器有机会了解 Kafka Connect 为您提交的偏移量,并且由于它不返回任何内容,因此无法修改这些偏移量或告诉 Kafka Connect 连接器正在处理哪些偏移量。

    这就是 preCommit 方法的用武之地。它实际上是 flush 的替代品(它实际上采用与 flush 相同的参数),除了它预计会返回 Kafka Connect 应提交的偏移量.默认情况下,preCommit 只调用flush,然后返回传递给preCommit 的相同偏移量,这意味着Kafka Connect 应该提交它通过preCommit 传递给连接器的所有偏移量。但是,如果您的 preCommit 返回一组空的偏移量,那么 Kafka Connect 将根本不记录任何偏移量。

    因此,如果您的连接器要处理外部系统中的所有偏移量并且不需要 Kafka Connect 记录任何内容,那么您应该覆盖 preCommit 方法而不是 flush,并返回一个空集偏移量。

    【讨论】:

    • 源连接器中的“List poll()”方法是否相同? Kafka Connect 在将 SourceRecords 列表写入目标主题后不会立即刷新偏移量?
    猜你喜欢
    • 2016-11-20
    • 2020-06-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-03-04
    • 2021-11-15
    • 2021-11-14
    相关资源
    最近更新 更多