【问题标题】:How to flatten multiple Pcollections in python apache beam如何在 python apache Beam 中展平多个 Pcollections
【发布时间】:2019-05-16 18:30:45
【问题描述】:

应该如何实现位于https://beam.apache.org/documentation/pipelines/design-your-pipeline/的以下逻辑:

//merge the two PCollections with Flatten//me 
PCollectionList<String> collectionList = PCollectionList.of(aCollection).and(bCollection);
PCollection<String> mergedCollectionWithFlatten = collectionList
    .apply(Flatten.<String>pCollections());

// continue with the new merged PCollection
mergedCollectionWithFlatten.apply(...);

从而可以将多个 PCollection 组合成一个 PCollection 在 apache 梁 python api 中?

【问题讨论】:

    标签: python stream apache-beam


    【解决方案1】:

    您也可以使用Flatten 转换。例如:

    data1 = ['one', 'two', 'three']
    data2 = ['four','five']
    
    input1 = p | 'Create PCollection1' >> beam.Create(data1)
    input2 = p | 'Create PCollection2' >> beam.Create(data2)
    
    merged = ((input1,input2) | 'Merge PCollections' >> beam.Flatten())
    

    合并后的 PCollection 将包含:

    INFO:root:one
    INFO:root:two
    INFO:root:three
    INFO:root:four
    INFO:root:five
    

    完整代码:

    import argparse, logging
    
    import apache_beam as beam
    from apache_beam.options.pipeline_options import PipelineOptions
    from apache_beam.options.pipeline_options import SetupOptions
    
    
    class LogFn(beam.DoFn):
      """Prints information"""
      def process(self, element):
        logging.info(element)
        return element
    
    
    def run(argv=None):
      parser = argparse.ArgumentParser()
      known_args, pipeline_args = parser.parse_known_args(argv)
    
      pipeline_options = PipelineOptions(pipeline_args)
      pipeline_options.view_as(SetupOptions).save_main_session = True
      p = beam.Pipeline(options=pipeline_options)
    
      data1 = ['one', 'two', 'three']
      data2 = ['four','five']
    
      input1 = p | 'Create PCollection1' >> beam.Create(data1)
      input2 = p | 'Create PCollection2' >> beam.Create(data2)
    
      merged = ((input1,input2) | 'Merge PCollections' >> beam.Flatten())
    
      merged | 'Check Results' >> beam.ParDo(LogFn())
    
      result = p.run()
      result.wait_until_finish()
    
    if __name__ == '__main__':
      logging.getLogger().setLevel(logging.INFO)
      run()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-03-29
      • 1970-01-01
      • 2019-10-05
      • 2017-09-03
      • 2018-05-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多