【问题标题】:Google Dataflow - Exclude one PCollection<String> from another PCollection<String>Google Dataflow - 从另一个 PCollection<String> 中排除一个 PCollection<String>
【发布时间】:2018-07-27 20:01:19
【问题描述】:

我有两个如下所示的 P-Collection

P1 = ['H','E','L','L','O','W','O','R','L','D']

P2 = ['W','E','L','C','O','M','E']

如果第一个集合存在,我想从第二个集合中排除元素以获得下面的结果

Result = ['H','R','D']

什么是优化和快速的方法?

【问题讨论】:

    标签: google-cloud-dataflow apache-beam


    【解决方案1】:

    使用CombinePerKeyhttps://beam.apache.org/documentation/programming-guide/#combine

    Python:https://beam.apache.org/documentation/sdks/pydoc/2.5.0/apache_beam.transforms.core.html?highlight=combineperkey#apache_beam.transforms.core.CombinePerKey

    Java:https://beam.apache.org/documentation/sdks/javadoc/2.5.0/org/apache/beam/sdk/transforms/Combine.PerKey.html

    1. 像这样将 P1 和 P2 转换为元组:

    代码:

    P1 = [('H', 'P1'), ('E', 'P1'), ('L', 'P1'), ('L', 'P1'), ('O', 'P1'), ('W', 'P1'), ('O', 'P1'), ('R', 'P1'), ('L', 'P1'), ('D', 'P1')]
    
    P2 = [('W', 'P2'), ('E', 'P2'), ('L', 'P2'), ('C', 'P2'), ('O', 'P2'), ('M', 'P2'), ('E', 'P2')]
    
    1. 将 2 个 p 集合拼合在一起

    2. 将扁平化的 p 集合传递给 CombinePerKey,并使用 CombineFn 来标记字符串是否同时包含在 p1 和 p2 中:

    代码:

    class IsInBoth(apache_beam.core.CombineFn):
        def _add_inputs(self, elements, accumulator=None):
            accumulator = accumulator or self.create_accumulator()
            for obj in elements:
                if obj == 'P1':
                    accumulator['P1'] = True
                if obj == 'P2':
                    accumulator['P2'] = True
            return accumulator
    
        def create_accumulator(self):
            return {'P1': False, 'P2': False}
    
        def add_input(self, accumulator, element, *args, **kwargs):
            return self._add_inputs(elements=[element], accumulator=accumulator)
    
        def add_inputs(self, accumulator, elements, *args, **kwargs):
            return self._add_inputs(elements=elements, accumulator=accumulator)
    
        def merge_accumulators(self, accumulators, *args, **kwargs):
            return {
                'P1': any([i['P1'] for i in accumulators]),
                'P2': any([i['P2'] for i in accumulators])}
    
        def extract_output(self, accumulator, *args, **kwargs):
            return accumulator
    
    1. CombinePerKey 中过滤掉具有{'P1': True, 'P2': True} 的结果

    【讨论】:

    • 我非常感谢您的代码,它很棒并且帮助我完成了类似的任务。但根据问题过滤器应该是{'P1': True, 'P2': False}
    猜你喜欢
    • 1970-01-01
    • 2019-02-09
    • 2023-02-03
    • 1970-01-01
    • 2021-12-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多