【问题标题】:How to commit offset manually in Kafka Sink Connector如何在 Kafka Sink 连接器中手动提交偏移量
【发布时间】:2020-02-18 15:37:09
【问题描述】:

我有一个 Kafka Sink Task,它正在通过以下方式收听 Kafka 主题 put() 方法。
但是我不想自动提交偏移量,因为一旦从 Kafka 获取记录,我就有一些处理逻辑。
从 Kafka 获取记录后,如果处理成功,那么我只想提交偏移量,否则它应该再次从相同的偏移量读取。

我可以看到 Kafka 消费者中有方法 commitSync(),但在 Sink Connector 中找不到相同的替代方法。

【问题讨论】:

    标签: apache-kafka kafka-consumer-api apache-kafka-connect


    【解决方案1】:

    添加此属性:("enable.auto.commit", "false")

    enable.auto.commit 的默认值为 true,第二个属性 auto.commit.interval.ms 的默认值为 5000

    【讨论】:

    • Connect worker 属性中必须是 consumer.enable.auto.commit
    • 我说的是 Kafka Sink Connector.. Kafka Connector 工作任务。它不是一个独立的消费者。我想知道如果记录处理失败或在 Sink Connector 中抛出任何异常,如何手动提交偏移量或从相同的偏移量重试。
    【解决方案2】:

    接收 Kafka 连接器-提交

    如果 option(enable.auto.commit) 为 False,则根据下面的 option(offset.flush.interval.ms) 每 60 秒自动提交一次。如果你的put()方法没有错误,就会正常提交。

    offset.flush.interval.ms
    Interval at which to try committing offsets for tasks.
    
    Type: long
    Default: 60000
    Importance: low
    

    在 Sink Kafka 中管理偏移量

    Kafka Connect 应该提交它通过 preCommit 传递给连接器的所有偏移量。但是,如果您的 preCommit 返回一组空的偏移量,那么 Kafka Connect 将根本不记录任何偏移量。 enter link description here

    SinkTask.java
    
    /**
     * Pre-commit hook invoked prior to an offset commit.
     *
     * The default implementation simply invokes {@link #flush(Map)} and is thus able to assume all {@code currentOffsets} are committable.
     *
     * @param currentOffsets the current offset state as of the last call to {@link #put(Collection)}},
     *                       provided for convenience but could also be determined by tracking all offsets included in the {@link SinkRecord}s
     *                       passed to {@link #put}.
     *
     * @return an empty map if Connect-managed offset commits are not desired, otherwise a map of committable offsets by topic-partition.
     */
    public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
        flush(currentOffsets);
        return currentOffsets;
    }
    

    SinkTaskContext.java
    
    /**
     * Request an offset commit. Sink tasks can use this to minimize the potential for redelivery
     * by requesting an offset commit as soon as they flush data to the destination system.
     *
     * This is a hint to the runtime and no timing guarantee should be assumed.
     */
    void requestCommit();
    

    【讨论】:

      猜你喜欢
      • 2021-04-06
      • 2021-10-16
      • 1970-01-01
      • 2015-04-09
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多