【问题标题】:How to Define Custom partitioner for Spark RDDs of equally sized partition where each partition has equal number of elements?如何为每个分区具有相同数量元素的相同大小分区的 Spark RDD 定义自定义分区器?
【发布时间】:2014-06-01 09:09:46
【问题描述】:

我是 Spark 的新手。我有一个大的元素数据集[RDD],我想把它分成两个大小完全相同的分区,保持元素的顺序。我尝试使用RangePartitioner 喜欢

var data = partitionedFile.partitionBy(new RangePartitioner(2, partitionedFile))

这并没有给出令人满意的结果,因为它大致划分但不完全相等的大小保持元素的顺序。 例如,如果有 64 个元素,我们使用 Rangepartitioner,则分为31个元素和33个元素。

我需要一个分区器,这样我就可以准确地在一半中获得前 32 个元素,而另一半包含第二组 32 个元素。 您能否通过建议如何使用自定义分区器来帮助我,以便我得到相同大小的两半,并保持元素的顺序?

【问题讨论】:

  • 嗨!你在哪里调用partitionBy,我在spark文档中找不到这个方法。在我定义了一个新的分区器后,我如何将现有的 RDD 分区为一组新的分区?谢谢!
  • partitionBy 在 [PairRDDFunctions](spark.apache.org/docs/latest/api/core/…) 中,因此您可以在任何RDD[K,V] 上调用它。这个类里面隐藏了一堆必不可少的方法,看看吧!
  • 谢谢丹尼尔!一定会检查出来的。
  • 好问题,我曾经使用CoalescedRDD,但他们在 1.0.0 中将其设为私有

标签: scala hadoop apache-spark


【解决方案1】:

Partitioners 通过将密钥分配给分区来工作。您需要事先了解密钥分配,或查看所有密钥,才能制作这样的分区程序。这就是 Spark 不为您提供的原因。

一般来说,您不需要这样的分区器。事实上,我无法想出一个需要相同大小分区的用例。如果元素个数是奇数怎么办?

无论如何,假设您有一个由连续Ints 键控的 RDD,并且您知道总共有多少个。然后你可以像这样写一个自定义的Partitioner

class ExactPartitioner[V](
    partitions: Int,
    elements: Int)
  extends Partitioner {

  def getPartition(key: Any): Int = {
    val k = key.asInstanceOf[Int]
    // `k` is assumed to go continuously from 0 to elements-1.
    return k * partitions / elements
  }
}

【讨论】:

  • 感谢 Daniel 的回复。它奏效了。我正在研究一种算法,该算法在数据集中包含偶数个元素。
  • 一旦你定义了这个新类,你在哪里调用它? RDD中的Partitioner是一个val,我无法更改,如果我用这个自定义的Partitioner定义了一个新的RDD,如何用方法创建它?
  • 倾斜会引入额外的处理时间。它是通过让一个执行器比另一个执行器有更多待处理的任务来引入的,或者因为分区的大小不相等(一个任务运行的时间比另一个任务长)。我通常会说过度调度更多(远远超过可用的核心)和更小的分区,以便偏差消失在噪音中。比尝试在单个执行器上精确匹配任务要好。
  • 请注意,还有另一种影响分区方式的方法。默认情况下它使用HashPartitioner,因此通过覆盖hashCode 方法,您还可以直接影响分区。
  • @Danial 解决方案确实有效。但是,当我将密钥设为“4”而没有时,如何将密钥分配给分区。分区为“2”,没有。这种情况下的元素“4”? Bcoz 使用给定的参数,'PartitionID' 将变为 2 (2*4/4),我们必须只有 2 个分区,即 '0' 和 '1'?
【解决方案2】:

这个答案从 Daniel 那里得到了一些灵​​感,但提供了一个完整的实现(使用 pimp my library pattern),并提供了一个满足人们复制和粘贴需求的示例:)

import RDDConversions._

trait RDDWrapper[T] {
  def rdd: RDD[T]
}

// TODO View bounds are deprecated, should use context bounds
// Might need to change ClassManifest for ClassTag in spark 1.0.0
case class RichPairRDD[K <% Ordered[K] : ClassManifest, V: ClassManifest](
  rdd: RDD[(K, V)]) extends RDDWrapper[(K, V)] {
  // Here we use a single Long to try to ensure the sort is balanced, 
  // but for really large dataset, we may want to consider
  // using a tuple of many Longs or even a GUID
  def sortByKeyGrouped(numPartitions: Int): RDD[(K, V)] =
    rdd.map(kv => ((kv._1, Random.nextLong()), kv._2)).sortByKey()
    .grouped(numPartitions).map(t => (t._1._1, t._2))
}

case class RichRDD[T: ClassManifest](rdd: RDD[T]) extends RDDWrapper[T] {
  def grouped(size: Int): RDD[T] = {
    // TODO Version where withIndex is cached
    val withIndex = rdd.mapPartitions(_.zipWithIndex)

    val startValues =
      withIndex.mapPartitionsWithIndex((i, iter) => 
        Iterator((i, iter.toIterable.last))).toArray().toList
      .sortBy(_._1).map(_._2._2.toLong).scan(-1L)(_ + _).map(_ + 1L)

    withIndex.mapPartitionsWithIndex((i, iter) => iter.map {
      case (value, index) => (startValues(i) + index.toLong, value)
    })
    .partitionBy(new Partitioner {
      def numPartitions: Int = size
      def getPartition(key: Any): Int = 
        (key.asInstanceOf[Long] * numPartitions.toLong / startValues.last).toInt
    })
    .map(_._2)
  }
}

然后在另一个文件中

// TODO modify above to be implicit class, rather than have implicit conversions
object RDDConversions {
  implicit def toRichRDD[T: ClassManifest](rdd: RDD[T]): RichRDD[T] = 
    new RichRDD[T](rdd)
  implicit def toRichPairRDD[K <% Ordered[K] : ClassManifest, V: ClassManifest](
    rdd: RDD[(K, V)]): RichPairRDD[K, V] = RichPairRDD(rdd)
  implicit def toRDD[T](rdd: RDDWrapper[T]): RDD[T] = rdd.rdd
}

然后对于您想要的用例(假设它已经排序)

import RDDConversions._

yourRdd.grouped(2)

免责声明:未经测试,只是将其直接写到 SO 答案中

【讨论】:

  • 这个“partitionBy”方法在哪里?我只在 JavaRDD 中看到它,而不是在 scala RDD 中看到它。更新:OK 在 PairRDDFunctions 中找到它(包含在implicits 中)
【解决方案3】:

在较新版本的 Spark 中,您可以编写自己的 Partitioner 并使用方法 zipWithIndex

这个想法是

  • 索引您的 RDD
  • 使用索引作为键
  • 根据所需分区的数量应用自定义Partitioner

示例代码如下所示:

  // define custom Partitioner Class
  class EqualDistributionPartitioner(numberOfPartitions: Int) extends Partitioner {
    override def numPartitions: Int = numberOfPartitions

    override def getPartition(key: Any): Int = {
      (key.asInstanceOf[Long] % numberOfPartitions).toInt
    }
  }

  // create test RDD (starting with one partition)
  val testDataRaw = Seq(
    ("field1_a", "field2_a"),
    ("field1_b", "field2_b"),
    ("field1_c", "field2_c"),
    ("field1_d", "field2_d"),
    ("field1_e", "field2_e"),
    ("field1_f", "field2_f"),
    ("field1_g", "field2_g"),
    ("field1_h", "field2_h"),
    ("field1_k", "field2_k"),
    ("field1_l", "field2_l"),
    ("field1_m", "field2_m"),
    ("field1_n", "field2_n")
  )
  val testRdd: RDD[(String, String)] = spark.sparkContext.parallelize(testDataRaw, 1)

  // create index
  val testRddWithIndex: RDD[(Long, (String, String))] = testRdd.zipWithIndex().map(msg => (msg._2, msg._1))

  // use index for equally distribution
  // Example with six partitions
  println("Example with 2 partitions:")
  val equallyDistributedPartitionTwo = testRddWithIndex.partitionBy(new EqualDistributionPartitioner(2))
  equallyDistributedPartitionTwo.foreach(k => println(s"Partition: ${TaskContext.getPartitionId()}, Content: $k"))

  println("\nExample with 4 partitions:")
  // Example with four partitions
  val equallyDistributedPartitionFour = testRddWithIndex.partitionBy(new EqualDistributionPartitioner(4))
  equallyDistributedPartitionFour.foreach(k => println(s"Partition: ${TaskContext.getPartitionId()}, Content: $k"))

spark 是你的SparkSession

作为输出你会得到:

Example with 2 partitions:
Partition: 0, Content: (0,(field1_a,field2_a))
Partition: 1, Content: (1,(field1_b,field2_b))
Partition: 0, Content: (2,(field1_c,field2_c))
Partition: 1, Content: (3,(field1_d,field2_d))
Partition: 0, Content: (4,(field1_e,field2_e))
Partition: 1, Content: (5,(field1_f,field2_f))
Partition: 0, Content: (6,(field1_g,field2_g))
Partition: 1, Content: (7,(field1_h,field2_h))
Partition: 0, Content: (8,(field1_k,field2_k))
Partition: 1, Content: (9,(field1_l,field2_l))
Partition: 0, Content: (10,(field1_m,field2_m))
Partition: 1, Content: (11,(field1_n,field2_n))

Example with 4 partitions:
Partition: 0, Content: (0,(field1_a,field2_a))
Partition: 0, Content: (4,(field1_e,field2_e))
Partition: 0, Content: (8,(field1_k,field2_k))
Partition: 3, Content: (3,(field1_d,field2_d))
Partition: 3, Content: (7,(field1_h,field2_h))
Partition: 3, Content: (11,(field1_n,field2_n))
Partition: 1, Content: (1,(field1_b,field2_b))
Partition: 1, Content: (5,(field1_f,field2_f))
Partition: 1, Content: (9,(field1_l,field2_l))
Partition: 2, Content: (2,(field1_c,field2_c))
Partition: 2, Content: (6,(field1_g,field2_g))
Partition: 2, Content: (10,(field1_m,field2_m))

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-07-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多