【问题标题】:Apache Beam in python: How to reuse exactly the same transform on another PCollectionpython中的Apache Beam:如何在另一个PCollection上重用完全相同的变换
【发布时间】:2018-10-26 17:08:43
【问题描述】:

我的几个 PCollection(来自不同来源)必须以相同的方式解码。

hits = (msgs | 'Parse' >> beam.Map(parse)
    | 'Decode' >> beam.Map(decode_hit))

然后:

dummy_hits = (dummy_msgs | 'Parse' >> beam.Map(parse)
    | 'Decode' >> beam.Map(decode_hit))

如果我可以重用这些转换,那将是非常好的,这要归功于我之前给它们提供的名称。我天真地尝试了这个:

dummy_hits = (dummy_msgs | 'Parse'
    | 'Decode')

但我的管道无法构建。 (TypeError:需要一个 PTransform 对象,得到 Parse)。

我认为这是可能的,因为管道模块的文档指出:“如果需要应用相同的转换实例,则右移运算符 应用于指定新名称(例如input | "label" >> my_tranform)”

这样做的方法是什么?只有这样吗?

【问题讨论】:

    标签: python google-cloud-dataflow apache-beam


    【解决方案1】:

    名称必须是唯一的,但由于您的步骤顺序相同,因此您可能希望创建这样的复合变换

    https://beam.apache.org/get-started/wordcount-example/#creating-composite-transforms

    这样做:

    class ParseDecode(beam.PTransform):
    
      def expand(self, pcoll):
        return (pcoll
                | 'Parse' >> beam.Map(parse)
                | 'Decode' >> beam.Map(decode_hit))
    

    这样你就可以这样做了:

    hits = (msgs | 'Parse msgs' >> ParseDecode()
    

    然后是这个:

    dummy_hits = (dummy_msgs | 'Parse dummy msgs' >> ParseDecode()
    

    【讨论】:

    • 在上面的代码中,我们如何改变它,使第二个变换等到第一个变换完成。
    • 将第一个变换的输出传递给计数器,然后将计数器的输出作为侧输入传递给第二个变换。你可以让第二个转换忽略侧面输入
    • 我对此很陌生,无法理解你所说的。如何将变换的输出传递给计数器?如果可能的话,你能分享一下sn-p吗?
    猜你喜欢
    • 2022-12-31
    • 1970-01-01
    • 1970-01-01
    • 2019-02-09
    • 1970-01-01
    • 2023-04-10
    • 1970-01-01
    • 2023-02-03
    • 2023-04-10
    相关资源
    最近更新 更多