【问题标题】:Wait.On() on Apache Beam Python SDK versionApache Beam Python SDK 版本上的 Wait.On()
【发布时间】:2019-11-09 15:24:02
【问题描述】:

我在 Python 上使用 Apache Beam,想问一下 Python SDK 上的 Apache Beam Java Wait.on() 是什么?

目前我对下面这段代码 sn-p 有问题

    if len(output_pcoll) > 1:
        merged = (tuple(output_pcoll) |
                  'MergePCollections1' >> beam.Flatten())
    else:
        merged = output_pcoll[0]

    outlier_side_input = self.construct_outlier_side_input(merged)

    (merged |
     "RemoveOutlier" >>
     beam.ParDo(utils.Remove_Outliers(),
                beam.pvalue.AsDict(outlier_side_input)) |
     "WriteToCSV" >>
     beam.io.WriteToText('../../ML-DATA/{0}.{1}'.format(self.BUCKET,
                         self.OUTPUT), num_shards=1))

似乎 Apache Beam 不会等到 self.construct_outlier_side_input 上的代码完成执行,并在下一个管道中执行“RemoveOutlier”时导致空侧输入。在Java版本中可以使用Wait.On()等待construct_outlier_side_input完成执行,但是我在Python SDK中找不到等效的方法。

--编辑-- 我要实现的目标与此链接中的几乎相同, https://rmannibucau.metawerx.net/post/apache-beam-initialization-destruction-task

【问题讨论】:

  • 欢迎来到stackoverflow。我也是python的新手,我想让你确定的是这个问题主要是关于时间而不是其他的。我的理解是你必须在python中实现多线程,否则代码将同步执行;在其他之后的声明。您应该首先测试,如果您在删除异常值之前等待 x 秒,您的代码行为符合您的预期,您可以使用 time.sleep(x),您也可以查看此链接:blog.miguelgrinberg.com/post/how-to-make-python-wait
  • 你能解释一下你的用例吗?您是否尝试从合并数据集中删除异常值?
  • HasnaaIbraheem 谢谢,我在发布之前尝试过,但仍然没有成功。 @jjayadeep,是的,我正在尝试从合并的数据集中删除异常值,但我要删除的异常值是本地异常值。因此,从合并的数据集中,我构建了一个字典,其中包含作为键的项目名称和作为字典值的值列表,然后在删除异常值时使用该字典作为辅助输入
  • 我认为这是在流式传输管道中?如果是,您要应用于 SideInput 数据的窗口是什么?
  • @RezaRokni 不是,这是批处理管道

标签: python google-cloud-dataflow apache-beam


【解决方案1】:

您可以使用 Beam 的附加输出功能来执行此操作。

示例代码sn-p如下

results = (words | beam.ParDo(ProcessWords(), cutoff_length=2, marker='x')
           .with_outputs('above_cutoff_lengths', 'marked strings',
                         main='below_cutoff_strings'))
below = results.below_cutoff_strings
above = results.above_cutoff_lengths
marked = results['marked strings']  # indexing works as well

运行上面的代码 sn-p 后,您会得到多个 PCollection,例如下面、上面和标记。然后,您可以使用边输入来进一步过滤或加入结果

希望对您有所帮助。

更新

基于 cmets,我想提一下 Apache Beam 能够在 ValueStateBagState 的帮助下进行状态处理。如果要求是通读 PCollection,然后根据是否存在先验值做出决定,则可以通过BagState 处理此类要求,如下所示:-

def process(self,
              element,
              timestamp=beam.DoFn.TimestampParam,
              window=beam.DoFn.WindowParam,
              buffer_1=beam.DoFn.StateParam(BUFFER_STATE_1),
              buffer_2=beam.DoFn.StateParam(BUFFER_STATE_2),
              watermark_timer=beam.DoFn.TimerParam(WATERMARK_TIMER)):

    # Do you processing here
    key, value = element
    # Read all the data from buffer1
    all_values_in_buffer_1 = [x for x in buffer_1.read()]

    if StatefulDoFn._is_clear_buffer_1_required(all_values_in_buffer_1):
        # clear the buffer data if required conditions are met.
        buffer_1.clear()

    # add the value to buffer 2
    buffer_2.add(value)

    if StatefulDoFn._all_condition_met():
      # Clear the timer if certain condition met and you don't want to trigger
      # the callback method.
      watermark_timer.clear()

    yield element

  @on_timer(WATERMARK_TIMER)
  def on_expiry_1(self,
                  timestamp=beam.DoFn.TimestampParam,
                  window=beam.DoFn.WindowParam,
                  key=beam.DoFn.KeyParam,
                  buffer_1=beam.DoFn.StateParam(BUFFER_STATE_1),
                  buffer_2=beam.DoFn.StateParam(BUFFER_STATE_2)):
    # Window and key parameters are really useful especially for debugging issues.
    yield 'expired1'

【讨论】:

  • 不幸的是,这个解决方案对我不起作用,因为传递给我的 Remove_Outliers() 将是 pcollection 中的每个元素,并构造像这样的字典 { item: [price1, price2, ...,priceN] } 我需要保存相同的状态,例如字典中是否存在特定项目,如果存在则附加列表。并且不支持在 Apache Beam 中保存状态。
  • @ruka - Beam 提供有状态处理语义。我已经用您可以参考的详细信息更新了我的答案。如果有帮助,请接受答案。
  • 对不起,我忘了说我的管道是批处理管道。谢谢你的详细解释
  • 您也可以将BagStateValueState 用于批处理管道。您可以在此处找到更多详细信息 - beam.apache.org/blog/2017/02/13/stateful-processing.html。如果这个答案有帮助,请接受我的回答。
猜你喜欢
  • 2022-01-01
  • 1970-01-01
  • 2019-10-05
  • 2018-01-03
  • 1970-01-01
  • 2023-03-25
  • 1970-01-01
  • 2023-03-11
  • 1970-01-01
相关资源
最近更新 更多