【问题标题】:How can i merge partitions by pairs in Spark?如何在 Spark 中成对合并分区?
【发布时间】:2015-08-29 22:06:18
【问题描述】:

我有一组点(每个点都是文本文件中的一行),它们分布在 Spark 的分区之间。

我使用 mapPartitions 操作,它在每个分区中得到一半的点(并不重要为什么和如何)。

现在,我想成对合并分区,因此分区 #1 和 #2 将是合并分区,分区 #3 和 #4 将是第二个合并分区,依此类推。

我将继续运行 mapPartitions,直到我只留下几个分区。 如何使用 Spark 做到这一点?

这里与 Hadoop 的类似物是我成对合并输出文件并再次运行任务。

我会再次澄清一下:我有 x 个分区,在我运行 mapPartitions 之后, 我想成对合并它们,所以我会有 x/2 个分区并再次运行 mapPartitions 等等..

【问题讨论】:

  • 如果您投反对票,很高兴知道原因。

标签: apache-spark


【解决方案1】:

使用treeAggregate 作为模型应该可以解决问题:

from math import log

def binaryReduce(rdd, f):
    assert log(rdd.getNumPartitions(), 2) % 1 == 0
    def mapPartition(i, iter):
        i = i / 2
        for x in iter:
            yield i, x

    while rdd.getNumPartitions() != 1:
        rdd = (rdd
            .mapPartitionsWithIndex(mapPartition)
            .reduceByKey(f, rdd.getNumPartitions() / 2)
            .values())

    return rdd.first()

如果您更喜欢更明确的方法,您可以一直保留分区号:

def binaryReduce(rdd, f):
    assert log(rdd.getNumPartitions(), 2) % 1 == 0

    def initPartition(i, iter):
        for x in iter:
            yield i, x

    rdd = rdd.mapPartitionsWithIndex(initPartition)

    while rdd.getNumPartitions() != 1:
        rdd = (rdd
            .reduceByKey(f)
            .map(lambda x: (x[0] / 2, x[1]))
            .partitionBy(rdd.getNumPartitions() / 2))

    return rdd.values().reduce(f)

对于 Python 3.0+,请务必将 / 替换为 //

【讨论】:

  • values() 对此有何影响?它是否将所有分区聚集在一起?它可以在分区之间混合值吗?
  • 所以如果它不在分区之间交换值,那么您在这里向我展示的两种方法有什么区别?
  • 另外,你为什么要使用这个断言?
  • _另外,你为什么要使用这个断言? _ - 能够创建完整的二叉树。 what is the difference between both approaches you showed me here - 你可以说第一个取决于实现的细节,否则没有实际区别。
【解决方案2】:

这个问题很模糊。 如果我做对了,您可以在将数据映射到 (key,value) 后尝试 reduceByKey。

(http://spark.apache.org/docs/latest/programming-guide.html#parallelized-collections)

希望这会有所帮助。

编辑:您应该将 mapPartitionsWithIndex(func) 与必须为 (Int, Iterator) => Iterator 类型的 func 一起使用。

【讨论】:

  • 很好的建议,但是我怎样才能给分区编号呢?我想要分区 1、2、3 ......我怎样才能做到这一点?我的意思是:我需要以一种有助于 reduceByKey 的方式提供密钥,我怎样才能得到它?
  • 您只需要确保要连接在一起的分区中的值具有相同的键。
  • 我希望前两个分区的键为 1,接下来的两个分区的键为 2,依此类推。我怎么知道给每个分区什么键?它非常接近我正在寻找的解决方案
  • mapPartitionsWithIndex(func),func 必须是 (Int, Iterator) => Iterator 类型。使用 int/2 作为键?因为 int 是分区号,所以分区号为 1 和 2,key = 1,分区号为 3 和 4,key = 2,依此类推。
  • 听起来很酷。在我将 reduceByKey 之后,每个键都将分配给它的分区吗?我的意思是,我不能将 key = 4 分成两个不同的分区?
猜你喜欢
  • 2016-10-02
  • 1970-01-01
  • 1970-01-01
  • 2015-10-15
  • 2015-01-01
  • 2016-04-22
  • 1970-01-01
  • 2020-03-09
  • 1970-01-01
相关资源
最近更新 更多