【问题标题】:Spark - Equal partitioning of RDD on nodesSpark - 节点上 RDD 的相等分区
【发布时间】:2019-05-21 02:40:52
【问题描述】:

我有一个有 4 个节点的架构和一个有 4000 行的 RDD,我需要在节点上平均地重新分区这个 RDD。结果应该是:

node 1 -> 1000 rows
node 2 -> 1000 rows
node 3 -> 1000 rows
node 4 -> 1000 rows.

如何在 Python 中做到这一点?

【问题讨论】:

标签: python apache-spark pyspark rdd partitioning


【解决方案1】:

我尝试使用您已经在使用的 pyspark 来实施 @sramalingam24 已经提出的解决方案。

from collections import Counter

data = [(i,j) for i,j in zip([i/1000 for i in range(0, 4000, 1)], range(500, 4500, 1))]
rdd = sc.parallelize(data).map(lambda x : (x[0], x[1]))
df = sqlContext.createDataFrame(rdd, ['key', 'values'])
df = df.repartition('key')

检查结果:

Counter(df.select(spark_partition_id()).collect())
Out[*]: Counter({Row(SPARK_PARTITION_ID()=5): 1000, Row(SPARK_PARTITION_ID()=128): 1000, Row(SPARK_PARTITION_ID()=107): 1000, Row(SPARK_PARTITION_ID()=69): 1000})

【讨论】:

  • 谢谢@sramalingam24 和@Chiheb.K!我不知道这个解决方案!非常感谢你们。
【解决方案2】:

如果你使用 textFile 来构建你可以使用的 rdd:

scala> val rdd = sc.textFile("hdfs://.../input.txt", 4)

或者你可以使用:

scala> rdd = rdd.repartition(4)

rdd.repartition(n) 进行洗牌以拆分数据以匹配 n 个分区。

【讨论】:

  • 这行不通。 repartiton()无法保证跨分区的均匀分布。你肯定会得到 4 个分区,但它们的负载分布并不均匀。
  • 我同意@cph_sto。我尝试过使用 repartition(n),但它并没有在节点上平均分割。
  • @Giulia 为什么你甚至想要严格地平均分配负载?恕我直言,不需要它。如果它有额外的优势,SPARK 会以这种方式实现它。
  • @cph_sto 要解释原因,您应该查看我的数据集,但我试图以简单的方式向您解释。我的数据集的前 1000 行是 worker0 专有的信息,后 1000 行是 worker1 专有的,依此类推。如果前 1000 行中的某些行出现在 worker1 上而不是 worker0 上,那么 worker1 中就会出现冗余。所以我需要设置每个分区的行数。
  • 取 rdd 和 zip 索引,转换为数据框,并在索引列上进行整数除以 1000 并按该列重新分区。这当然是假设您还没有根据需要划分数据的列
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2015-05-17
  • 2021-02-24
  • 1970-01-01
  • 2019-06-16
  • 2020-09-18
  • 2016-01-04
  • 2017-02-17
相关资源
最近更新 更多