【问题标题】:Applying multiple PTransforms on one PCollection simultaneously in Apache Beam pipeline在 Apache Beam 管道中同时在一个 PCollection 上应用多个 PTransform
【发布时间】:2019-12-10 02:01:16
【问题描述】:

我正在尝试创建一个光束管道,以在一个 PCollection 上同时应用多个 ParDo 变换,并将所有结果收集并打印到一个列表中。到目前为止,我已经经历了顺序过程,比如第一个 ParDo,然后是第二个 ParDo。 这是我为我的问题准备的一个例子:

import apache_beam as beam

from apache_beam.options.pipeline_options import PipelineOptions

p = beam.Pipeline(options=PipelineOptions())

class Tr1(beam.DoFn):
  def process(self, number):
    number = number + 1
    yield number

class Tr2(beam.DoFn):
  def process(self, number):
    number = number + 2
    yield number

def pipeline_test():

  numbers =  p | "Create" >> beam.Create([1])
  tr1 = numbers  | "Tr1" >> beam.ParDo(Tr1())
  tr2 = numbers  | "Tr2" >> beam.ParDo(Tr2())

  tr1 | "Print1" >> beam.Map(print)
  tr2 | "Print2" >> beam.Map(print) 

def main(argv):
  del argv

  pipeline_test()

  result = p.run()
  result.wait_until_finish()
if __name__ == '__main__':
  app.run(main)

【问题讨论】:

  • 您能否附上您看到 PTransform 正在按顺序运行的数据流作业图?

标签: python apache-beam


【解决方案1】:

转换和元素的调度由用于运行管道的运行器管理。

跑步者通常会尝试优化图表,并可能按顺序或并行运行某些任务。

在您的情况下,Tr1 和 Tr2 都是无状态的,并且应用于相同的输入。在这种情况下,运行程序通常会在同一台机器上为相同的元素按顺序运行它们。 注意,runner 仍然会并行运行不同的元素。

它应该看起来像这样。

线程 1 ele1 -> Tr1 -> Tr2

线程 2 ele1 -> Tr1 -> Tr2

我不建议依赖管道不同部分的预期并行度,因为它取决于运行器。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-02-09
    • 1970-01-01
    • 2020-06-14
    • 1970-01-01
    • 2022-12-31
    • 2023-01-13
    相关资源
    最近更新 更多