【问题标题】:Spark : how can evenly distribute my records in all partitionSpark:如何在所有分区中均匀分布我的记录
【发布时间】:2015-11-17 23:19:55
【问题描述】:

我有一个包含 30 条记录的 RDD(键/值对:键是时间戳,值是 JPEG 字节数组)
我正在运行 30 个执行程序。我想将此 RDD 重新分区为 30 个分区,以便每个分区获取一条记录并分配给一个执行程序。

当我使用rdd.repartition(30) 时,它会将我的 rdd 重新分区为 30 个分区,但有些分区获得 2 条记录,有些获得 1 条记录,有些则没有获得任何记录。

Spark 中有什么方法可以将我的记录平均分配到所有分区。

【问题讨论】:

  • RDD已经有4个分区
  • 当您调用repartition时,Spark 确实会重新分区您的数据,但所有分区不一定包含完全相同数量的记录。这里真正的问题是,为什么这很重要?
  • 嗨,我想使用 spark 流在 1 秒内对所有 30 条记录执行操作。现在我的算法需要大约 400 毫秒才能对 1 条记录执行操作。所以我想平均分配我的记录,以便每个执行者只对一条记录执行操作,这样我的 1 秒批次将立即完成。

标签: apache-spark


【解决方案1】:
可以使用

加盐技术,该技术涉及添加新的“假”密钥并与当前密钥一起使用以更好地分布数据。

(here is link for salting)

【讨论】:

    【解决方案2】:

    您可以通过使用partitionBy 命令并提供多个分区来强制进行新分区。默认情况下,分区器是基于散列的,但您可以切换到基于范围的分区以获得更好的分布。如果您真的想强制重新分区,您可以使用随机数生成器作为分区函数(在 PySpark 中)。

    my_rdd.partitionBy(pCount, partitionFunc = lambda x: np.random.randint(pCount))
    

    然而,这会经常导致低效的 shuffle(节点之间传输大量数据),但如果您的进程是计算受限的,那么它是有意义的。

    【讨论】:

      【解决方案3】:

      下面是一个将 rdd 重新分区为npartitions 分区的示例,以便项目在分区中均匀分布。每个分区中的项目数最多相差1。

      evenly_repartitioned = (
          rdd
          .zipWithIndex()
          .map(lambda p: (p[1], p[0]))
          .partitionBy(N, lambda p: p)
          .values()
      )
      

      确实如此:

      • 创建一个 (item, index) 元组,其中索引覆盖整个 RDD
      • 交换键和值,所以现在RDD包含(index, item)
      • 使用标识partitionFunc 重新分区到N 分区,将项目移动到分区index % N
      • 只取值,删除元组中的索引。

      请注意,这比默认的基于哈希的重新分区要慢,因为它需要在zipWithIndex() 期间另一个 Spark 阶段来计算每个分区的大小。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2019-10-25
        • 2016-04-05
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多