【问题标题】:Write to Cloud Storage from Dataflow with dynamic datetime使用动态日期时间从 Dataflow 写入 Cloud Storage
【发布时间】:2021-07-21 21:38:57
【问题描述】:

我们正在构建写入 Cloud Storage 的 Apache Beam (Java SDK) 管道。我们使用TextIO.write() 转换来写入存储。在此操作中,我们希望根据当前日期时间动态更改存储文件的子目录。

这是流式传输管道的一部分。理想情况下,我们希望部署它并让 Beam 作业根据处理日期时间动态更改保存文件的文件夹子目录。

我们当前的管道转换如下所示:

DateTimeFormatter dtfOut = DateTimeFormat.forPattern("yyyy/MM/dd");

myPCollection.apply(TextIO.write().to("gs://my-bucket/%s", dtfOut.print(new DateTime()));

此代码的问题在于,该函数返回的 DateTime 值与管道部署到 Google Cloud Dataflow 时的值相同。我们希望根据处理传入消息时的日期时间动态更改子目录结构。

我打算:

  1. 从 ParDo 函数中获取日期时间。
  2. 创建一个新的 ParDo 函数并将消息用作主要输入,并将来自其他 ParDo 函数的日期时间作为辅助输入传递。

这是最好的方法吗? Apache Beam 中是否有内置工具可以解决我们的用例?

【问题讨论】:

标签: java google-cloud-storage google-cloud-dataflow apache-beam


【解决方案1】:

FileIO 提供 writeDynamic() 方法,该方法允许根据项目本身的内容将 pCollection 的每个项目定向到不同的目录或文件。

下面是我创建的一个简单示例,只是为了演示:

public class ExamplePipeline {
public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
    Pipeline pipeline = Pipeline.create(options);


    Create.Values<String> sampleData = Create.of("ABC","DEF", "GHI");

    pipeline.apply(sampleData)
            .apply("WritingDynamic", FileIO.<PartitionData, String>writeDynamic()
                    .by(event -> new PartitionData())
                    .withDestinationCoder(AvroCoder.of(PartitionData.class))
                    .via(Contextful.fn(event -> event), TextIO.sink())
                    .to("gs://my-bucket/")
                    .withNaming(partitionData -> FileIO.Write.defaultNaming(partitionData.getPath(), ".txt")));

    pipeline.run().waitUntilFinish();
}

public static class PartitionData implements Serializable {
    private static final long serialVersionUID = 4549174774076944665L;

    public String getPath() {
        LocalDateTime writtingMoment = LocalDateTime.now(ZoneOffset.UTC);
        int year = writtingMoment.getYear();
        int month = writtingMoment.getMonthValue();
        int day = writtingMoment.getDayOfMonth();

        return String.format("%d/%02d/%02d/", year, month, day);
    }
}

上面的代码将保存到一个结构中: gs://my-bucket/${year}/%{month}/%{day}/... .txt

by() 方法中,我使用了我的数据分区器,我称之为PartitionData

via() 应该返回你想要最终写入的内容。

to() 将成为您路径的基础部分。

withNaming 中,您真正构建了路径的最后部分。

通常情况下,我会在事件本身上设置时间戳,告诉它们何时在现实中发生,然后您可以从事件中获取,而不是使用 LocalDateTime.now

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-02-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-08-01
    相关资源
    最近更新 更多