【问题标题】:How often put() is triggered in Kafka Connect sink tasks?Kafka Connect 接收器任务中多久触发一次 put()?
【发布时间】:2016-12-23 20:32:28
【问题描述】:

我可以控制触发我的 Kafka Connect Sink 任务的 put() 方法的时间间隔吗? Kafka Connect 框架在这方面的预期行为是什么?理想情况下,我想指定,例如,“不要打电话给我,除非你有 X 个新记录/Y 个新字节,或者自上次调用以来经过了 Z 毫秒”。这可能会使接收器任务中的批处理逻辑更简单(引用documentation,“在许多情况下,内部缓冲将很有用,因此可以一次发送整批记录,从而减少将事件插入到下游数据存储)。

【问题讨论】:

    标签: apache-kafka apache-kafka-connect


    【解决方案1】:

    今天,只有在 WorkerSinkTask 中调用 deliverMessages 时才会调用来自 SinkTask 的 put。好消息是deliverMessages 唯一发生的时间是在poll 内,因此您应该可以控制通过overriding consumer properties 轮询新记录的频率。

    如果您想进行内部缓冲,您可以在其implementation of SinkTask 中查看 HDFSConnector 如何处理此问题。但是,现在,Connect 会立即放入投票返回的所有记录。

    综上所述,如果您真的希望在消息到达下游系统之前对其进行批处理,您可以考虑查看offset.flush.interval.ms and offset.flush.timeout.ms,它控制着flush() 的调用频率。

    【讨论】:

      猜你喜欢
      • 2017-12-05
      • 2020-05-29
      • 2011-10-10
      • 1970-01-01
      • 2019-06-17
      • 2019-02-23
      • 2017-10-17
      • 2020-05-13
      • 2016-12-26
      相关资源
      最近更新 更多