【问题标题】:Writing text data from PCollection to GCS with custom file name使用自定义文件名将文本数据从 PCollection 写入 GCS
【发布时间】:2021-06-17 12:36:14
【问题描述】:

在用 Kotlin 编写的数据流作业中 使用 PubSub 订阅作为输入,我收到一个 Proto 对象(事件)并将该对象映射到字符串。

我的管道有类型:

PCollection<KV<Event, String>>

这些字符串是必须在 GCS 中写入的文件的行。 事件对象有一个必须用来设置文件名的“Id”和一个用于设置文件夹的“名称”。

是否可以使用 FileIO ?

pipeline.apply(
   FileIO.writeDynamic<String, String>()
     .to("gs://my-bucket")
     // withNaming?

)

我的目标是根据 Event 对象中的信息在正确的文件中写入正确的行

【问题讨论】:

    标签: kotlin google-cloud-storage apache-beam dataflow


    【解决方案1】:

    可以通过向withNaming() API 提供FileNaming 实现来自定义文件名。

    但是,目前不支持将输入元素直接映射到最终文件名。输入元素可以使用dynamic destinations API 映射到组,您可以为每个组提供文件命名策略。

    要使用输入元素值完全自定义命名,您可能需要实施新的接收器转换。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多