【问题标题】:How to split a huge rdd and broadcast it by turns?如何拆分一个巨大的rdd并轮流广播?
【发布时间】:2016-04-11 21:55:32
【问题描述】:

说明

我们的 spark 版本是 1.4.1

我们想加入两个巨大的 RDD,其中一个带有倾斜数据。所以 spark rdd 操作 join 可能会导致内存问题。我们尝试将较小的一个分成几部分,然后分批广播它们。在每个广播轮次中,我们尝试将较小的 rdd 的一部分收集到驱动程序,然后将其保存到 HashMap,然后广播 HashMap。每个 executor 使用广播值对较大的 rdd 进行 map 操作。我们通过这种方式实现了我们的倾斜数据连接。

但是当它在每回合处理广播值时。我们发现我们无法在处理后破坏我们的广播值。如果我们使用broadcast.destroy(),下一轮我们处理数据将 触发错误。像这样:

java.io.IOException: org.apache.spark.SparkException: Attempted to use Broadcast(6) after it was destroyed (destroy at xxx.java:369)

我们查看了spark的源码,发现这个问题是由rdd依赖关系导致的。如果 rdd3 -> rdd2 -> rdd1 (箭头显示依赖关系)。 rdd1 使用名为 b1 的广播变量生成,rdd2 使用 b2。在生成 rdd3 时,源代码显示它需要序列化 ​​b1 和 b2。如果 b1 或 b2 在 rdd3 生成过程之前被销毁。这将导致我在上面列出的失败。

问题

有没有办法让rdd3忘记它的依赖,让它在生产过程中不需要b1、b2,只需要rdd2?

或者是否存在处理倾斜连接问题的方法?

顺便说一句,我们为每个回合设置了检查点。并将 spark.cleaner.ttl 设置为 600。问题仍然存在。如果我们不销毁广播变量,执行器将在第 5 回合丢失。

我们的代码是这样的:

for (int i = 0; i < times; i++) {
    JavaPairRDD<Tuple2<String, String>, Double> prevItemPairRdd = curItemPairRdd;
    List<Tuple2<String, Double>> itemSplit = itemZippedRdd
            .filter(new FilterByHashFunction(times, i))
            .collect();

    Map<String, Double> itemSplitMap = new HashMap<String, Double>();
    for (Tuple2<String, Double> item : itemSplit) {
        itemSplitMap.put(item._1(), item._2());
    }
    Broadcast<Map<String, Double>> itemSplitBroadcast = jsc
            .broadcast(itemSplitMap);

    curItemPairRdd = prevItemPairRdd
            .mapToPair(new NormalizeScoreFunction(itemSplitBroadcast))
            .persist(StorageLevel.DISK_ONLY());
    curItemPairRdd.count();

    itemSplitBroadcast.destroy(true);
    itemSplit.clear();

}

【问题讨论】:

  • 所以你尝试加入两个 RDD,但没有成功,你决定自己重新实现加入。我质疑这个决定。为什么你认为你的 join 实现会比 Spark 的更好?我宁愿建议你解决加入的问题。将这些问题发布为 Stack Overflow 问题。

标签: apache-spark


【解决方案1】:

我个人会尝试一些不同的方法。让我们从一个小的模拟数据集开始

import scala.util.Random
Random.setSeed(1)

val left = sc.parallelize(
  Seq.fill(200)(("a", Random.nextInt(100))) ++ 
  Seq.fill(150)(("b",  Random.nextInt(100))) ++ 
  Seq.fill(100)(Random.nextPrintableChar.toString, Random.nextInt(100))
)

按键计数:

val keysDistribution = left.countByKey

进一步假设第二个 RDD 是均匀分布的:

val right = sc.parallelize(
  keysDistribution.keys.flatMap(x => (1 to 5).map((x, _))).toSeq)

并将每个键可以处理的值的数量阈值设置为 10:

val threshold = 10
  1. 使用代理键来增加粒度。

    想法很简单。而不是加入(k, v) 对,而是使用((k, i), v),其中i 是一个整数,它取决于给定k 的阈值和元素数量。

    val buckets = keysDistribution.map{
      case (k, v) => (k -> (v / threshold + 1).toInt)
    }
    
    // Assign random i to each element in left
    val leftWithSurrogates = left.map{case (k, v) => {
      val i = Random.nextInt(buckets(k))
      ((k, i), v)
    }}
    
    // Replicate each value from right to i buckets
    val rightWithSurrogates = right.flatMap{case (k, v) => {
      (0 until buckets(k)).map(i => ((k, i), v))
    }}
    
    val resultViaSurrogates = leftWithSurrogates
      .join(rightWithSurrogates)
      .map{case ((k, _), v) => (k, v)}
    
  2. 分而治之 - 拆分处理频繁键和不频繁键。

    首先让我们使用不常用的键加入:

    val infrequentLeft = left.filter{
      case (k, _) => keysDistribution(k) < threshold
    }
    
    val infrequentRight = right.filter{
      case (k, _) => keysDistribution(k) < threshold
    }
    
    val infrequent = infrequentLeft.join(infrequentRight)
    

    接下来让我们分别处理每个频繁键:

    val frequentKeys = keysDistribution
      .filter{case (_, v) => v >= threshold}
      .keys
    
    val frequent = sc.union(frequentKeys.toSeq.map(k => {
      left.filter(_._1 == k)
        .cartesian(right.filter(_._1 == k))
        .map{case ((k, v1), (_, v2)) => (k, (v1, v2))}
    }))
    

    最后让两个子集合并:

    val resultViaUnion = infrequent.union(frequent)
    

快速健全性检查:

val resultViaJoin = left.join(right).sortBy(identity).collect.toList

require(resultViaUnion.sortBy(identity).collect.toList == resultViaJoin)
require(resultViaSurrogates.sortBy(identity).collect.toList == resultViaJoin)

显然,这更像是一个草图,而不是一个完整的解决方案,但应该让您了解如何进行。与broadcast 相比的最大优势在于它消除了驱动程序瓶颈。

有没有办法让rdd3忘记它的依赖,让它在生产过程中不需要b1、b2,只需要rdd2?

您使用检查点和强制计算,但如果任何分区丢失,它仍然会失败。

【讨论】:

  • 一个新手问题:看起来如果left rdd 中的数据高度倾斜(即同一键的许多实例),那么rightWithSurrogates rdd 的大小将远大于原来的right rdd。从内存的角度来看,我们是否使问题复杂化了。我的猜测是不会有太大的不利影响,因为尽管规模很大,但密钥分布几乎是均匀的(所以没有一个执行者会承担很大的工作量)。只是想确认一下我的理解是否正确。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-01-01
  • 2020-02-06
  • 2016-01-03
相关资源
最近更新 更多