【问题标题】:Computing GroupBy once then passing it to multiple transformations in Google DataFlow (Python SDK)计算 GroupBy 一次,然后将其传递给 Google DataFlow(Python SDK)中的多个转换
【发布时间】:2019-04-02 16:04:03
【问题描述】:

我正在使用 Python SDK for Apache Beam 在 Google DataFlow 上运行特征提取管道。我需要运行多个转换,所有这些转换都希望项目按键分组。

基于对question 的回答,DataFlow 无法自动发现和重用像 GroupBy 这样的重复转换,所以我希望先运行 GroupBy,然后将结果 PCollection 提供给其他转换(参见下面的示例代码)。

我想知道这是否应该在 DataFlow 中有效地工作。如果没有,Python SDK 中推荐的解决方法是什么?有没有一种有效的方法让多个 Map 或 Write 转换获取相同 GroupBy 的结果?就我而言,我观察到 DataFlow 在利用率为 5% 时扩展到最大工作人员数量,并且在此 question 中描述的 GroupBy 之后的步骤中没有取得任何进展。

示例代码。为简单起见,仅显示了 2 个转换。

# Group by key once.
items_by_key = raw_items | GroupByKey()

# Write groupped items to a file.
(items_by_key | FlatMap(format_item) | WriteToText(path))

# Run another transformation over the same group.
features = (items_by_key | Map(extract_features))

【问题讨论】:

    标签: google-cloud-platform google-cloud-dataflow apache-beam


    【解决方案1】:

    将单个GroupByKey 步骤的输出输入到多个转换中应该可以正常工作。但是您可以获得的并行化量取决于原始GroupByKey 步骤中可用的密钥总数。如果任何一个下游步骤是高扇出,请考虑在这些步骤之后添加一个Reshuffle 步骤,这将允许 Dataflow 进一步并行执行。

    例如,

    pipeline | Create([<list of globs>]) | ParDo(ExpandGlobDoFn()) | Reshuffle() | ParDo(MyreadDoFn()) | Reshuffle() | ParDo(MyProcessDoFn())
    

    这里,

    • ExpandGlobDoFn:扩展输入 glob 并生成文件
    • MyReadDoFn:读取给定文件
    • MyProcessDoFn:处理从文件中读取的元素

    我在这里使用了两个Reshuffles(请注意Reshuffle 中有一个GroupByKey)以允许(1)从给定的glob 并行读取文件(2)并行处理来自给定文件的元素。

    【讨论】:

    • 感谢您的反馈!您能否举例说明在 GroupBy 之后可能需要 Reshuffle() 的情况?只是想确保我可以正确识别应该使用的模式。
    • 添加了一个例子。谢谢。
    • 非常感谢。真的很感激这个例子。是否有计划让 DataFlow 足够智能以检测何时需要“重新洗牌”?我有一个管道,其中 DataFlow 启动所有可用的 VM 并将它们保持在 5% 的利用率,在 GroupBy 和下游步骤中几乎没有任何进展。我将尝试查看在 GroupBy 之前运行 reshuffle 是否有帮助。 Pablo 建议在计数步骤中使用组合器,但没有帮助。这是问题stackoverflow.com/questions/55401268/…。可以的话请看一下。
    • 更新:DataFlow 无法正确处理来自单个 GroupBy 的输出到多个转换中。见stackoverflow.com/a/55527648/3745936。常识表明它应该工作并且是一个合理的优化,但实际上,这样的管道将永远卡在分配的所有可用工人身上。我提交了一个错误issuetracker.google.com/issues/129457239,它被关闭为“按预期工作”。我不认为 Dataflow 工程师有意做出如此糟糕的行为。
    【解决方案2】:

    根据我对this SO question 进行故障排除的经验,在多个转换中重复使用 GroupBy 输出可能会使您的管道极其缓慢。至少这是我使用 Apache Beam SDK 2.11.0 for Python 的经验。

    常识告诉我,从执行图中的单个 GroupBy 分支应该会使我的管道运行得更快。在 120 多个工作人员上运行 23 小时后,管道无法取得任何重大进展。我尝试添加重新洗牌,在可能的情况下使用组合器并禁用实验性洗牌服务。

    在我将管道分成两条之前,没有任何帮助。第一个管道计算 GroupBy 并将其存储在一个文件中(我需要将其“按原样”摄取到数据库中)。第二个使用 GroupBy 输出读取文件,读取其他输入并运行进一步的转换。结果 - 所有转换都在 2 小时内成功完成。我想如果我只是在原始管道中复制 GroupBy,我可能会获得相同的结果。

    我想知道这是否是 DataFlow 执行引擎或 Python SDK 中的错误,或者它按预期工作。如果是设计好的,那么至少应该记录在案,这样的管道在提交的时候不应该被接受,否则应该有警告。

    您可以通过查看“组关键字”步骤中的 2 个分支来发现此问题。看起来解决方案是分别为每个分支重新运行 GroupBy。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-10-02
      • 1970-01-01
      • 2021-11-25
      相关资源
      最近更新 更多