【问题标题】:How to efficiently distribute and use partitions in spark?spark中如何高效分配和使用partition?
【发布时间】:2017-08-19 17:12:32
【问题描述】:

这是我的例子。

val arr = Array((1,2), (1,3), (1,4), (2,3), (4,5))
val data = sc.parallelize(arr, 5)

data.glom.map(_length).collect
Array[Int] = Array(1, 1, 1, 1, 1)

val agg = data.reduceByKey(_+_)
agg.glom.map(_.length).collect
Array[Int] = Array(0, 1, 1, 0, 1)

val fil = agg.filter(_._2 < 4)
fil.glom.map(_.length).collect
Array[Int] = Array(0, 0, 1, 0, 0)

val sub = data.map{case(x,y) => (x, (x,y))}.subtractByKey(fil).map(_._2)
Array[(Int, Int)] = Array((1,4), (1,3), (1,2), (4,5))

sub.glom.map(_.length).collect
Array[Int] = Array(0, 3, 0, 0, 1)

我想知道的是平均分配分区。

data 变量由五个分区组成,所有数据均匀分区。

ex)par1: (1,2)
   par2: (1,3)
   par3: (1,4)
   par4: (2,3)
   par5: (4,5)

在几个transformation operation之后,分配给sub变量的五个分区中只有两个被使用。

sub 变量由五个分区组成,但并非所有数据都被均匀分区。

ex)par1: empty
   par2: (1,2),(1,3),(1,4)
   par3: empty
   par4: empty
   par5: (4,5)

如果我在sub 变量中添加另一个transformation operation,将有5 个可用分区,但只有2 个分区用于操作。

ex)sub.map{case(x,y) => (x, x, (x,y))}

所以我想在操作数据时利用所有可用的分区。

我用了repartition的方法,但并不便宜。

ex) sub.repartition(5).glom.map(_.length).collect
Array[Int] = Array(0, 1, 1, 2, 0)

所以我正在寻找一种明智的方法来利用尽可能多的分区。

有什么好办法吗?

【问题讨论】:

    标签: apache-spark load-balancing rdd partitioning wise


    【解决方案1】:

    所以repartition 绝对是要走的路:)

    您的示例有点过于简单,无法演示任何内容,因为 Spark 是为处理数十亿行而不是 5 行而构建的。 repartition 不会将完全相同相同数量的行放入每个分区,但它会均匀分布数据。尝试用 1.000.000 行重做您的示例,您会发现在repartition 之后数据确实分布均匀。

    在处理大量数据的转换时,数据倾斜通常是一个大问题,并且重新分区数据确实会带来额外的时间成本,因为它需要对数据进行混洗。不过,有时值得接受惩罚,因为它会使后续的转换阶段运行得更快。

    【讨论】:

      猜你喜欢
      • 2021-03-09
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-07-02
      • 1970-01-01
      • 2017-10-11
      相关资源
      最近更新 更多