【问题标题】:What is the Apache Beam way to handle 'routing'Apache Beam 处理“路由”的方式是什么
【发布时间】:2018-11-08 18:39:51
【问题描述】:

我正在使用 Apache Beam 进行计算 - 如果成功,我想将输出写入一个接收器,如果失败,我想将其写入另一个接收器。

有没有办法在 Apache Beam 中处理元数据或基于内容的路由?

我已经广泛使用了 Apache Camel,因此在我看来,基于先前转换的结果,我应该使用 router 将消息路由到不同的接收器(可能由我在消息头)。 Apache Beam 是否有类似的功能,或者我是否只需要一个顺序转换来检查 PCollection 并在转换中处理写入接收器的操作?

理想情况下,我想要这样的逻辑(为了清楚起见,写得比较详细)

result = my_pcollections | 'compute_stuff' >> beam.Map(lambda (pcollection): my_compute_func(pcollection))
result | ([success_failure_router]
   | 'sucess_sink' >> beam.io.WriteToText('/path/to/file')
   | 'failure_sink' >> beam.io.WriteStringsToPubSub('mytopic'))

但是.. 我怀疑“Beam”的处理方式是

result = my_pcollections | 'compute_stuff' >> beam.Map(lambda (pcollection): my_compute_func(pcollection))
result | 'write_results_appropriately' >> write_results_appropriately(result))
...
def write_results_appropriately(result):
   if result == ..:
      # success, write to file
   else:
      # failure, write to topic

谢谢, 凯文

【问题讨论】:

    标签: google-cloud-dataflow apache-beam


    【解决方案1】:

    高层:

    在这种情况下,我不确定 Python API 的具体细节,但从高层次来看,它看起来像这样:

    • par-dos 支持多个输出;
    • 输出由您提供的标签标识(例如“正确元素”、“无效元素”);
    • 在您的主要标准中,您是否写入多个输出,使用您的标准选择输出;
    • 每个输出都由一个单独的PCollection 表示;
    • 然后你会得到单独的PCollections,代表你的par-do的标记输出;
    • 然后对每个标记的PCollections 应用不同的接收器;

    详细见章节 https://beam.apache.org/documentation/programming-guide/#additional-outputs

    【讨论】:

      猜你喜欢
      • 2020-10-06
      • 2017-09-20
      • 2016-05-17
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多