【问题标题】:How to add suffix to final completed file generated by BucketingSink in Apache Flink?如何为 Apache Flink 中 BucketingSink 生成的最终完成文件添加后缀?
【发布时间】:2018-06-20 15:11:00
【问题描述】:
我使用 Apache Flink 在 HDFS 上创建了一些存档数据文件,生成的文件名具有类似 part-{parallel-task}-{count} 的模式,但我期望应该有“.gz”后缀,可以直接加载阿帕奇火花。
我在 Apache Flink 中找不到任何 API 为 BucketingSink 生成的最终完成文件添加后缀,但只能为 InProgress、Pending 和 ValidLength 状态添加后缀。任何人都可以帮忙吗? HDFS Connector & Java API
【问题讨论】:
标签:
apache-flink
flink-streaming
bucket
sink
suffix
【解决方案1】:
据我所知,没有使用默认 BucketingSink 添加后缀的选项。
一种选择是不使用检查点并将待处理的后缀设置为所需的后缀。但由于在大多数情况下都需要检查点,因此这不是最佳选择。
我的解决方案是创建一个 BucketingSinkWithSuffix 实现,它几乎是默认 BucketingSink 的精确副本。唯一需要改变的是为后缀添加一个成员变量,可以在构造函数中设置并调整基本路径的创建方式。
这是我对构造函数的实现:
public BucketingSinkWithSuffix(String basePath, String suffix) {
this.basePath = basePath;
this.bucketer = new DateTimeBucketer<>();
this.writerTemplate = new StringWriter<>();
this.partSuffix = suffix;
}
以及用于生成基本路径(第 523 和 528 行):
partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter + partSuffix);