【问题标题】:Optimal number of partitions in a grouped PairRDD in SparkSpark中分组PairRDD中的最佳分区数
【发布时间】:2019-01-26 19:00:15
【问题描述】:

我有两个结构为 RDD[String, Int] 的 RDD 对,分别称为 rdd1 和 rdd2。

这些 RDD 中的每一个都按其键分组,我想对其值执行一个函数(因此我将使用 mapValues 方法)。 “GroupByKey”方法是为每个键创建一个新分区还是让我使用“partitionBy”手动指定?

我知道如果我不执行更改键的操作,RDD 的分区不会改变,所以如果我对每个 RDD 执行 mapValues 操作,或者如果我在前两个 RDD 之间执行连接操作,生成的 RDD 的分区不会改变。 是真的吗?

这里有一个代码示例。请注意,“功能”没有定义,因为它在这里并不重要。

val lvl1rdd=rdd1.groupByKey()
val lvl2rdd=rdd2.groupByKey()
val lvl1_lvl2=lvl1rdd.join(lvl2rdd)
val finalrdd=lvl1_lvl2.mapValues(value => function(value))

如果我加入之前的 RDD 并在生成的 RDD (mapValues) 的值上执行一个函数,则所有工作都在单个 worker 中完成,而不是将不同的任务分布在集群的不同 worker 节点上。我的意思是,期望的行为应该是并行执行作为参数传递给 mapValues 方法的函数,在集群允许的这么多节点中。

【问题讨论】:

标签: scala apache-spark rdd partitioning


【解决方案1】:

1) 避免 groupByKey 操作,因为它们会成为网络 I/O 和执行性能的瓶颈。 在这种情况下首选 reduceByKey 操作,因为数据 shuffle 相对小于 groupByKey,如果它是更大的 Dataset,我们可以更好地看到差异。

val lvl1rdd = rdd1.reduceByKey(x => function(x)) 
val lvl1rdd = rdd2.reduceByKey(x => function(x))
//perform the Join Operation on these resultant RDD's

在 RDD 上单独应用函数并加入它们比加入 RDD 并使用 groupByKey() 应用函数要好得多

这也将确保任务在不同的执行者之间分配并并行执行

Refer this link

2) 底层的分区技术是Hash partitioner。如果我们假设我们的数据最初位于 n 个分区中,那么 groupByKey 操作将遵循 Hash 机制。

partition = key.hashCode() % numPartitions

这将创建固定数量的分区,当您使用 groupByKey 操作时可以超过初始数量。我们还可以自定义要创建的分区。例如

val result_rdd = rdd1.partitionBy(new HashPartitioner(2))

这将创建 2 个分区,通过这种方式我们可以设置分区的数量。 要确定最佳分区数,请参阅此答案https://stackoverflow.com/a/40866286/7449292

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-09-18
    • 2018-08-18
    • 2017-12-02
    • 1970-01-01
    • 1970-01-01
    • 2023-03-18
    • 2017-01-15
    相关资源
    最近更新 更多