在较新版本的 Spark 中,您可以编写自己的 Partitioner 并使用方法 zipWithIndex
这个想法是
- 索引您的 RDD
- 使用索引作为键
- 根据所需分区的数量应用自定义
Partitioner
示例代码如下所示:
// define custom Partitioner Class
class EqualDistributionPartitioner(numberOfPartitions: Int) extends Partitioner {
override def numPartitions: Int = numberOfPartitions
override def getPartition(key: Any): Int = {
(key.asInstanceOf[Long] % numberOfPartitions).toInt
}
}
// create test RDD (starting with one partition)
val testDataRaw = Seq(
("field1_a", "field2_a"),
("field1_b", "field2_b"),
("field1_c", "field2_c"),
("field1_d", "field2_d"),
("field1_e", "field2_e"),
("field1_f", "field2_f"),
("field1_g", "field2_g"),
("field1_h", "field2_h"),
("field1_k", "field2_k"),
("field1_l", "field2_l"),
("field1_m", "field2_m"),
("field1_n", "field2_n")
)
val testRdd: RDD[(String, String)] = spark.sparkContext.parallelize(testDataRaw, 1)
// create index
val testRddWithIndex: RDD[(Long, (String, String))] = testRdd.zipWithIndex().map(msg => (msg._2, msg._1))
// use index for equally distribution
// Example with six partitions
println("Example with 2 partitions:")
val equallyDistributedPartitionTwo = testRddWithIndex.partitionBy(new EqualDistributionPartitioner(2))
equallyDistributedPartitionTwo.foreach(k => println(s"Partition: ${TaskContext.getPartitionId()}, Content: $k"))
println("\nExample with 4 partitions:")
// Example with four partitions
val equallyDistributedPartitionFour = testRddWithIndex.partitionBy(new EqualDistributionPartitioner(4))
equallyDistributedPartitionFour.foreach(k => println(s"Partition: ${TaskContext.getPartitionId()}, Content: $k"))
spark 是你的SparkSession。
作为输出你会得到:
Example with 2 partitions:
Partition: 0, Content: (0,(field1_a,field2_a))
Partition: 1, Content: (1,(field1_b,field2_b))
Partition: 0, Content: (2,(field1_c,field2_c))
Partition: 1, Content: (3,(field1_d,field2_d))
Partition: 0, Content: (4,(field1_e,field2_e))
Partition: 1, Content: (5,(field1_f,field2_f))
Partition: 0, Content: (6,(field1_g,field2_g))
Partition: 1, Content: (7,(field1_h,field2_h))
Partition: 0, Content: (8,(field1_k,field2_k))
Partition: 1, Content: (9,(field1_l,field2_l))
Partition: 0, Content: (10,(field1_m,field2_m))
Partition: 1, Content: (11,(field1_n,field2_n))
Example with 4 partitions:
Partition: 0, Content: (0,(field1_a,field2_a))
Partition: 0, Content: (4,(field1_e,field2_e))
Partition: 0, Content: (8,(field1_k,field2_k))
Partition: 3, Content: (3,(field1_d,field2_d))
Partition: 3, Content: (7,(field1_h,field2_h))
Partition: 3, Content: (11,(field1_n,field2_n))
Partition: 1, Content: (1,(field1_b,field2_b))
Partition: 1, Content: (5,(field1_f,field2_f))
Partition: 1, Content: (9,(field1_l,field2_l))
Partition: 2, Content: (2,(field1_c,field2_c))
Partition: 2, Content: (6,(field1_g,field2_g))
Partition: 2, Content: (10,(field1_m,field2_m))