【问题标题】:How to generate gcs files one after another with google cloud dataflow and java?如何用google cloud dataflow和java依次生成gcs文件?
【发布时间】:2021-02-12 08:28:55
【问题描述】:

我有一个管道,其中一个 gcs 文件作为输入并生成两个 gcs 输出文件。
一个输出文件包含错误信息,另一个包含正常信息。
而且我有两个输出文件的 gcs 触发器的云功能。
仅当错误信息文件为 0 字节时,我才想对普通信息文件执行某些操作。
所以我必须让错误信息文件早于正常信息文件生成,以检查错误信息文件的大小。

现在我使用 2 TextIO.Write 来生成这两个文件。
但是我无法控制先生成哪个。
在云函数中,我让普通信息文件通过重试检查错误信息文件的大小。
但是云功能的超时限制为 540 秒,因此在生成错误信息文件之前我无法重试。
如何在 Cloud Dataflow 中处理此问题?
我可以在正常信息文件之前以编程方式生成错误信息文件吗?

【问题讨论】:

    标签: java google-cloud-dataflow apache-beam apache-beam-io


    【解决方案1】:

    您可以通过使用边输入来完成这样的排序。例如,

    error_pcoll = ...
    good_data_pcoll = ...
    
    error_write_result = error_pcoll | beam.io.WriteToText(...)
    (good_data_pcoll
     | beam.Map(
           # This lambda simply emits what it was given.
           lambda element, blocking_side: element,
           # This side input isn't used,
           # but will force error_write_result to be computed first.
           blocking_side=beam.pvalue.AsIterable(error_write_result))
     | beam.io.WriteToText(...))
    

    WaitPTransform 封装了这种模式。

    【讨论】:

    • 我阅读了“The Wait PTransform”的文档。 doc中的示例代码(“Wait.on(firstWriteResults)”)需要一个PCollection的参数,但是TextIO.Write的结果是PDone。
    • 哦,我忘了 Java 的 Write 没有这么灵活。您可以尝试使用 writeCustomType:beam.apache.org/releases/javadoc/2.2.0/org/apache/beam/sdk/io/…
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-02-18
    • 2015-08-01
    • 1970-01-01
    • 2021-10-30
    • 1970-01-01
    相关资源
    最近更新 更多