【问题标题】:Cartesian Product of PCollection with itselfPCollection 与自身的笛卡尔积
【发布时间】:2020-02-19 04:54:03
【问题描述】:

假设我有一个KV<String, Integer> 类型的有界 PCollection p。假设 p 无法放入内存,因此不能作为 DoFn 的侧输入。

示例 p:

("foo", 0)
("bar", 1)
("baz", 2)

我怎样才能得到 p 和它自己的笛卡尔积?

例如,p x p 可能如下所示:

("foo+foo", [("foo", 0), ("foo", 0)])
("foo+bar", [("foo", 0), ("bar", 1)])
("foo+baz", [("foo", 0), ("baz", 2)])
("bar+foo", [("bar", 1), ("foo", 0)])
("bar+bar", [("bar", 1), ("bar", 1)])
("bar+baz", [("bar", 1), ("baz", 2)])
("baz+foo", [("baz", 2), ("foo", 0)])
("baz+bar", [("baz", 2), ("bar", 1)])
("baz+baz", [("baz", 2), ("baz", 2)])

【问题讨论】:

    标签: java python apache-beam


    【解决方案1】:

    正如您所推测的,最简单的方法是使用 DoFn 将您的 PCollection 处理为主要和辅助输入。

    如果这不起作用,因为 PCollection 太大而无法放入内存,您可以将其划分为 N 个不相交的 PCollection,将其传递给每个 PCollection,然后将结果展平。例如,你可以写类似

    class CrossProduct(beam.PTransform):
      def expand(self, pcoll):
        N = 10
        parts = pcoll | beam.Partition(lambda element, n: hash(element) % n, N)
        cross_parts = [
            pcoll | str(ix) >> beam.FlatMap(
                lambda x, side: [(x, s) for s in side],
                beam.pvalue.AsIter(part))
            for ix, part in enumerate(parts)]
        return cross_parts | beam.Flatten()
    
    output = input | CrossProduct()
    

    但是请注意,除非您的 PCollection 的元素特别大,否则如果 PCollection 无法装入内存,则其叉积可能会非常大,无法生成(和处理)。

    【讨论】:

      【解决方案2】:

      我将提出一个使用 Python 的解决方案。

      首先,让我们实现算法,然后解决内存限制问题

      import itertools
      
      # Let's build a list with your pairs
      collection_items = [("foo", 0), ("bar", 1), ("baz", 2)]
      
      """
      A Python generator is a function that produces a sequence of results. 
      It works by maintaining its local state, so that the function can resume again exactly where 
      it left off when called subsequent times. Same generator can't be used twice.
      I will explain a little later why I use generators
      """
      
      collection_generator1 = (el for el in collection_items)  # Create the first generator
      # For example; calling next(collection_generator1) => ("foo", 0); next(collection_generator1) => ("bar", 1),
      # next(collection_generator1) => ("bar": 2)
      collection_generator2 = (el for el in collection_items) # Create the second generator
      cartesian_product = itertools.product(collection_generator1, collection_generator2) # Create the cartesian product
      
      for pair in cartesian_product:
          first_el, second_el = pair
          str_pair1, val_pair1 = first_el
          str_pair2, val_pair2 = first_el
      
          name = "{str_pair1}+{str_pair2}".format(str_pair1=str_pair1, str_pair2=str_pair2)
          item = (name, [first_el, second_el]) # Compose the item
          print(item)
      
      # OUTPUT
      
      ('foo+foo', [('foo', 0), ('foo', 0)])
      ('foo+foo', [('foo', 0), ('bar', 1)])
      ('foo+foo', [('foo', 0), ('baz', 2)])
      ('bar+bar', [('bar', 1), ('foo', 0)])
      ('bar+bar', [('bar', 1), ('bar', 1)])
      ('bar+bar', [('bar', 1), ('baz', 2)])
      ('baz+baz', [('baz', 2), ('foo', 0)])
      ('baz+baz', [('baz', 2), ('bar', 1)])
      ('baz+baz', [('baz', 2), ('baz', 2)])
      

      现在让我们解决内存问题

      因为你有很多数据,一个好主意是将它们存储在一个文件中,在每一行写一对(如你的例子) 现在让我们读取文件(“input.txt”)并使用其数据创建一个生成器。

      file_generator_1 = (line.strip() for line in open("input.txt"))
      file_generator_2 = (line.strip() for line in open("input.txt").readlines())
      

      现在,您需要做的唯一修改是将变量名称 collection_generator1、collection_generator2 替换为 file_generator_1、file_generator_2

      【讨论】:

        猜你喜欢
        • 2013-06-27
        • 1970-01-01
        • 2021-02-07
        • 1970-01-01
        • 2016-11-22
        • 2013-01-15
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多