【问题标题】:How to add suffix to final completed file generated by BucketingSink in Apache Flink?如何为 Apache Flink 中 BucketingSink 生成的最终完成文件添加后缀?
【发布时间】:2018-06-20 15:11:00
【问题描述】:

我使用 Apache Flink 在 HDFS 上创建了一些存档数据文件,生成的文件名具有类似 part-{parallel-task}-{count} 的模式,但我期望应该有“.gz”后缀,可以直接加载阿帕奇火花。

我在 Apache Flink 中找不到任何 API 为 BucketingSink 生成的最终完成文件添加后缀,但只能为 InProgress、Pending 和 ValidLength 状态添加后缀。任何人都可以帮忙吗? HDFS Connector & Java API

【问题讨论】:

    标签: apache-flink flink-streaming bucket sink suffix


    【解决方案1】:

    据我所知,没有使用默认 BucketingSink 添加后缀的选项。

    一种选择是不使用检查点并将待处理的后缀设置为所需的后缀。但由于在大多数情况下都需要检查点,因此这不是最佳选择。

    我的解决方案是创建一个 BucketingSinkWithSuffix 实现,它几乎是默认 BucketingSink 的精确副本。唯一需要改变的是为后缀添加一个成员变量,可以在构造函数中设置并调整基本路径的创建方式。

    这是我对构造函数的实现:

        public BucketingSinkWithSuffix(String basePath, String suffix) {
        this.basePath = basePath;
        this.bucketer = new DateTimeBucketer<>();
        this.writerTemplate = new StringWriter<>();
        this.partSuffix = suffix;
    }
    

    以及用于生成基本路径(第 523 和 528 行):

    partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter + partSuffix);
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-05-20
      • 2012-12-04
      • 2019-02-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-10-25
      • 2010-10-27
      相关资源
      最近更新 更多