【问题标题】:Move file after Pipeline has run管道运行后移动文件
【发布时间】:2015-10-10 14:00:58
【问题描述】:

数据流管道完成运行后是否可以在 GCS 中移动文件?如果是这样,怎么做?应该是最后一个.apply?我无法想象会是这样。

这里的情况是我们从客户端导入大量 .csv。我们需要无限期地保留这些 CSV,因此我们需要“将 CSV 标记为已处理”,或者将它们移出 TextIO 用于查找 csv 的初始文件夹。我目前唯一能想到的是将文件名存储在 BigQuery 中(我不确定我是如何得到这个的,我是 DF 新手),然后从执行中排除已经存储的文件管道不知何故?但必须有更好的方法。

这可能吗?我应该检查什么?

感谢您的帮助!

【问题讨论】:

    标签: google-cloud-dataflow


    【解决方案1】:

    您可以尝试使用BlockingDataflowPipelineRunner 并在p.run() 之后在您的主程序中运行任意逻辑(它将等待管道完成)。

    参见Specifying Execution Parameters,特别是“阻塞执行”部分。

    但是,一般来说,您似乎真的想要一个持续运行的管道,该管道可以监视包含 CSV 文件的目录并在新文件出现时导入它们,而不是两次导入同一个文件。这对于流式管道来说是一个很好的例子:你可以编写一个自定义的UnboundedSource(另见Custom Sources and Sinks)来监视一个目录并返回其中的文件名(即T可能是String或@ 987654330@):

    p.apply(Read.from(new DirectoryWatcherSource(directory)))
     .apply(ParDo.of(new ReadCSVFileByName()))
     .apply(the rest of your pipeline)
    

    其中DirectoryWatcherSource 是您的UnboundedSourceReadCSVFileByName 也是您需要编写的转换,它采用文件路径并将其作为 CSV 文件读取,并返回其中的记录(不幸的是,现在您不能在管道中间使用像 TextIO.Read 这样的转换,只能在开始时使用 - 我们正在努力解决这个问题。

    这可能有点棘手,正如我所说,我们正在开发一些功能以使其更简单,我们正在考虑创建一个这样的内置源,但目前这可能仍然是比“弹球工作”更容易。请尝试一下,如果有任何不清楚的地方,请通过dataflow-feedback@google.com 告诉我们!

    同时,您还可以在 Cloud Bigtable 中存储有关您已处理或未处理的文件的信息 - 它比 BigQuery 更适合,因为它更适合随机写入和查找,而 BigQuery 是更适合对整个数据集进行大批量写入和查询。

    【讨论】:

    • 谢谢,我的印象是BlockingDataflowPipelineRunner 是一个异步操作(应该仔细考虑名称的“阻塞”部分)。感谢您清除它。既然你提到了持续的目录监视,你是说现在不可能吗?然后,我们可能每隔几个小时就会有几个弹球作业执行前面提到的检查(文件是否已经完成),但这可能会导致一些竞争条件 - 有什么建议吗?
    • 我编辑了我的答案以提供另一种选择,并详细解释潜在的“正在开发中”的功能。
    • 好极了,现在更有意义了!谢谢!
    • @jkff 自 2015 年提出原始问题以来,Google Cloud Dataflow 中是否已实施任何新功能?为管道执行通常的 ETL 前/后处理步骤似乎真的很麻烦而且很麻烦。今天的最佳实践方法是什么?
    • 我认为我对另一个问题的回答在这里也很有意义:stackoverflow.com/questions/36365058/…
    猜你喜欢
    • 1970-01-01
    • 2015-05-17
    • 2017-07-21
    • 2021-04-06
    • 2020-01-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多