【问题标题】:Dataflow: is it possible to run parts of the pipeline synchronously and other parts asynchronously?数据流:是否可以同步运行管道的一部分而异步运行其他部分?
【发布时间】:2016-12-21 17:47:31
【问题描述】:

我正在设置如下所示的简单数据流管道:

Pipeline p = Pipeline.create(options);
p.apply(TextIO.Read.from("source.csv"))
.apply(ParDo.of(new TransformEachLine()))
.apply(ParDo.of(new ProcessEachTransform()));
p.run();

如何让TransformEachLine 的每次迭代等待ProcessEachTransform 的每次迭代完成相应TransformEachLine 迭代产生的所有元素?另外,如何使TransformEachLine 的每次迭代按顺序执行(如源“.csv”文件中所示)

基本上,水平缩放和随机化 ProcessEachTransform 对我来说很重要,但不是同时随机化前两个 ParDos。

现在,该管道的假设输出如下所示:

Line B
   Processed item 2
   Processed item 1
   Processed item 3
Line A
   Processed Item 3
   Processed Item 1
   Processed Item 2
Line C
   Processed Item 1
   Processed Item 3
   Processed Item 2

如何按顺序对齐“第 XX 行”,但在每个“第 XX 行”中保持“已处理项目 XX”随机和并行化?这是否意味着我必须在本地执行部分代码并将其他部分发送到 Dataflow 进行并行处理?

【问题讨论】:

  • 您能否详细介绍一下您的用例:有多少“行”以及每行有多少项?为什么需要按顺序处理它们?您处理它们的顺序是否重要(与 .csv 文件中的顺序相同?),还是只有两条线不会同时得到处理?
  • 因为这些是金融交易。假设 (1) A 支付 B,然后 (2) B 支付 C(历史数据,以批处理方式处理)。如果 (2) 在 (1) 之前处理,您可能没有足够的资金,即使在现实生活中由于 (1) 而不是这种情况。因此,按顺序执行 (1) 和 (2) 很重要。同时,假设每笔交易涉及一百万次(或数次)操作。结果是处理(1)和(2)会很好,但是将每个子操作分布在一定数量的工作节点之间,因此它们被快速处理。

标签: google-cloud-dataflow


【解决方案1】:

听起来您实际上需要完全按照输入文件中指定的顺序处理事物。

这不是您可以在 Beam API 中直接表达的内容:PCollection 元素的处理始终是并行且无序的 - 您需要按顺序将每个事务作为单个管道运行(尽管如果事务的数量是低 - 最多数百个 - 您可以在管道本身的结构中表达顺序,例如每个事务 1 次转换和一些接线以使转换按顺序执行 - 如果您想知道这个,请告诉我我会编辑我的答案)。

请注意,Beam 模型也不保证元素将被处理多少次(例如,在失败重试或冗余的情况下,跑步者可以多次处理一个元素) ,因此您还需要使一个事务中的项目处理也是幂等的。

【讨论】:

  • 嗯,交易数量非常大,绝对不是数百,远不止于此。执行诸如调用 Pipeline.apply 或在顺序循环中重复创建/运行管道而不产生每次启动工作人员和进行支持工作的开销之类的事情是否合理?我明天打算玩它,但如果有适当的方法,知道会有所帮助。最坏的情况,可能只是分配最大的机器并以扁平方式在云中执行。
  • 不幸的是,Dataflow 运行器目前无法在不同管道之间重用工作人员。但是,您可能会更幸运地使用 Spark 运行器针对 Cloud Dataproc 上的 Spark 集群运行相同的 Beam 管道(相同的代码)。有关说明,请参阅github.com/apache/incubator-beam/tree/master/runners/spark
猜你喜欢
  • 1970-01-01
  • 2015-10-12
  • 1970-01-01
  • 2020-10-08
  • 2019-05-11
  • 2018-11-16
  • 2021-09-23
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多