【问题标题】:Can HdfsSink3Connector create duplicates?HdfsSink3Connector 可以创建重复项吗?
【发布时间】:2021-09-14 20:37:26
【问题描述】:

根据Documentation,Sink 连接器确保 Exactly-Once-Delivery。

在连接器任务线程失败的情况下如何确保Exact-Once-Delivery?

它会删除失败的任务线程创建的文件吗?还是将损坏/部分文件留在 HDFS 中?

连接器使用预写日志来确保每条记录仅写入一次 HDFS。此外,连接器通过将 Kafka 偏移量信息编码到 HDFS 文件中来管理偏移量,以便在失败和任务重新启动的情况下,它可以从最后提交的偏移量开始。

请帮帮我。

【问题讨论】:

    标签: apache-kafka hdfs apache-kafka-connect confluent-platform


    【解决方案1】:

    HDFS 连接器保存文件名中的偏移量并将它们返回到连接器中的使用者 api,以便知道它需要从哪里继续,这样它提供 Exactly Once 语义、EOS 并避免重复。

    /**

    • HDFS 连接器跟踪 HDFS 中文件名中的偏移量(用于 Exactly Once Semantics)作为最后一个
    • 写入 HDFS 中最后一个文件的记录偏移量。
    • 此方法返回 HDFS 中最后一个偏移量之后的下一个偏移量,对某些 API 很有用
    • (如 Kafka 消费者偏移跟踪)。
    • @return 上次写入 HDFS 的偏移量之后的下一个偏移量,如果没有文件被提交,则返回 -1
    • yet
      

    */

    https://github.com/confluentinc/kafka-connect-hdfs/blob/1d68023c38e17f0ed6f87f3b78d86c2e08f39909/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java

    正在从文件名中读取offset

    long lastCommittedOffsetToHdfs = FileUtils.extractOffset(
          fileStatusWithMaxOffset.getPath().getName());
      log.trace("Last committed offset based on filenames: {}", lastCommittedOffsetToHdfs);
      // `offset` represents the next offset to read after the most recent commit
      offset = lastCommittedOffsetToHdfs + 1;
      log.trace("Next offset to read: {}", offset);
    

    如果 hdfs 文件已写入磁盘,则在任务开始时,将从文件名中读取偏移量并从该点继续...

    如果文件还没有写入磁盘,在任务中它将重新开始读取并尝试将文件写入hdfs,成功时将提交偏移量,如果提交失败但文件存在于hdfs上,在任务开始时,它将从 hdfs 文件继续偏移 -

    【讨论】:

    • 谢谢。我明白这一点。在提交偏移量之前,如果任务线程死亡,正在写入的文件会发生什么
    • 如果 hdfs 文件已写入磁盘,则在任务开始时,将从文件名中读取偏移量并从该点继续...
    • 如果文件还没有写入磁盘,在任务中它将重新开始读取并尝试将文件写入hdfs,成功时将提交偏移量,如果提交失败但文件存在于 hdfs 上,它将在任务开始时从 hdfs 文件继续使用偏移量
    • 好的,非常感谢。我会在尝试后立即接受,从我这边 +1。
    猜你喜欢
    • 2019-02-24
    • 2021-03-21
    • 1970-01-01
    • 2016-01-06
    • 2021-03-23
    • 1970-01-01
    • 2014-04-07
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多