【问题标题】:flatten list of lists in (key, List[List]) PCollection, retaining the key展平 (key, List[List]) PCollection 中的列表列表,保留键
【发布时间】:2021-10-18 13:41:39
【问题描述】:

我有一个 (key, value) 对的 PCollection,其中值本身就是一个列表

{'apple': ['1','2'],
 'watermelon': ['4','5']
 'apple': ['9','10']
 }

我想获得另一个具有相同键的集合,但列表是原始集合上的GroupByKey,但结果值是扁平化的。

{'apple': ['1','2','9','10'],
 'watermelon': ['4','5']
 }

而不是

{'apple': [['1','2], ['9','10']],
 'watermelon': [['4','5']]
 }

我已经尝试了一些事情,但我不知道如何去做。

我想人们总是可以编写一个ParDo 函数来按每个元素展平列表,但感觉必须有一个更简单的解决方案。

【问题讨论】:

    标签: apache-beam


    【解决方案1】:

    您要查找的逻辑操作是“按键组合”,其中组合器是列表连接(在 Python 中称为 +)。

    根据数据的特性,您可能希望使用新的CombineFn 来实现此操作,它将列表作为累加器进行变异。这几乎相当于执行GroupByKey,然后在ParDo 中展平列表。

    执行上的区别在于,Combine 操作可以在打乱你的数据之前执行,因为 Beam runner 知道它是一个关联和交换操作。在这种情况下,它不会减少 shuffled 的数据量,所以它不是那么重要。

    【讨论】:

    • 正确。所以我应该把它看作是一个以列表连接为操作符的“reduce”操作,这里reducer被称为combiner。我觉得有道理,会试试看。
    【解决方案2】:

    您是否正在寻找类似的东西

    import apache_beam as beam
    
    
    class Combiner(beam.CombineFn):
    
        def create_accumulator(self, *args, **kwargs):
            return {}
        
        def add_input(self, acc, element):
            key = element[0]
            value = element[1]
            if key in acc:
                value.extend(acc[key])
            acc[key]=value
            return acc
        
        def merge_accumulators(self, accumulators):
            return accumulators
    
        def extract_output(self, accumulator):
            return accumulator
    
    with beam.Pipeline() as p:
        pipe =(
            p
              |'create'>> beam.Create(
                    
                    [
                    ('apple', ['1','2']),
                    ('watermelon', ['4','5']),
                    ('apple', ['9','10']),
                    ('watermelon', ['49','50'])
                    ]
                    
     )
              |'combiner'>>beam.CombineGlobally(Combiner())
              |'print'>>beam.Map(print)
                 
        )
    
    #o/p - [{'apple': ['9', '10', '1', '2'], 'watermelon': ['49', '50', '4', '5']}]
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-03-13
      • 2015-01-12
      • 2015-05-15
      • 1970-01-01
      • 1970-01-01
      • 2021-07-06
      • 2019-08-11
      • 1970-01-01
      相关资源
      最近更新 更多