【发布时间】:2019-11-09 15:24:02
【问题描述】:
我在 Python 上使用 Apache Beam,想问一下 Python SDK 上的 Apache Beam Java Wait.on() 是什么?
目前我对下面这段代码 sn-p 有问题
if len(output_pcoll) > 1:
merged = (tuple(output_pcoll) |
'MergePCollections1' >> beam.Flatten())
else:
merged = output_pcoll[0]
outlier_side_input = self.construct_outlier_side_input(merged)
(merged |
"RemoveOutlier" >>
beam.ParDo(utils.Remove_Outliers(),
beam.pvalue.AsDict(outlier_side_input)) |
"WriteToCSV" >>
beam.io.WriteToText('../../ML-DATA/{0}.{1}'.format(self.BUCKET,
self.OUTPUT), num_shards=1))
似乎 Apache Beam 不会等到 self.construct_outlier_side_input 上的代码完成执行,并在下一个管道中执行“RemoveOutlier”时导致空侧输入。在Java版本中可以使用Wait.On()等待construct_outlier_side_input完成执行,但是我在Python SDK中找不到等效的方法。
--编辑-- 我要实现的目标与此链接中的几乎相同, https://rmannibucau.metawerx.net/post/apache-beam-initialization-destruction-task
【问题讨论】:
-
欢迎来到stackoverflow。我也是python的新手,我想让你确定的是这个问题主要是关于时间而不是其他的。我的理解是你必须在python中实现多线程,否则代码将同步执行;在其他之后的声明。您应该首先测试,如果您在删除异常值之前等待 x 秒,您的代码行为符合您的预期,您可以使用 time.sleep(x),您也可以查看此链接:blog.miguelgrinberg.com/post/how-to-make-python-wait
-
你能解释一下你的用例吗?您是否尝试从合并数据集中删除异常值?
-
HasnaaIbraheem 谢谢,我在发布之前尝试过,但仍然没有成功。 @jjayadeep,是的,我正在尝试从合并的数据集中删除异常值,但我要删除的异常值是本地异常值。因此,从合并的数据集中,我构建了一个字典,其中包含作为键的项目名称和作为字典值的值列表,然后在删除异常值时使用该字典作为辅助输入
-
我认为这是在流式传输管道中?如果是,您要应用于 SideInput 数据的窗口是什么?
-
@RezaRokni 不是,这是批处理管道
标签: python google-cloud-dataflow apache-beam