【发布时间】:2021-07-21 21:38:57
【问题描述】:
我们正在构建写入 Cloud Storage 的 Apache Beam (Java SDK) 管道。我们使用TextIO.write() 转换来写入存储。在此操作中,我们希望根据当前日期时间动态更改存储文件的子目录。
这是流式传输管道的一部分。理想情况下,我们希望部署它并让 Beam 作业根据处理日期时间动态更改保存文件的文件夹子目录。
我们当前的管道转换如下所示:
DateTimeFormatter dtfOut = DateTimeFormat.forPattern("yyyy/MM/dd");
myPCollection.apply(TextIO.write().to("gs://my-bucket/%s", dtfOut.print(new DateTime()));
此代码的问题在于,该函数返回的 DateTime 值与管道部署到 Google Cloud Dataflow 时的值相同。我们希望根据处理传入消息时的日期时间动态更改子目录结构。
我打算:
- 从 ParDo 函数中获取日期时间。
- 创建一个新的 ParDo 函数并将消息用作主要输入,并将来自其他 ParDo 函数的日期时间作为辅助输入传递。
这是最好的方法吗? Apache Beam 中是否有内置工具可以解决我们的用例?
【问题讨论】:
-
感谢您的回复。我仍然不清楚如何使用 ValueProvider.RuntimeValueProvider 类在运行时创建子目录路径。你能告诉我如何将
dftOut.print(...)方法包装到 ValueProvider 类中吗?
标签: java google-cloud-storage google-cloud-dataflow apache-beam