【发布时间】:2016-04-11 21:55:32
【问题描述】:
说明:
我们的 spark 版本是 1.4.1
我们想加入两个巨大的 RDD,其中一个带有倾斜数据。所以 spark rdd 操作 join 可能会导致内存问题。我们尝试将较小的一个分成几部分,然后分批广播它们。在每个广播轮次中,我们尝试将较小的 rdd 的一部分收集到驱动程序,然后将其保存到 HashMap,然后广播 HashMap。每个 executor 使用广播值对较大的 rdd 进行 map 操作。我们通过这种方式实现了我们的倾斜数据连接。
但是当它在每回合处理广播值时。我们发现我们无法在处理后破坏我们的广播值。如果我们使用broadcast.destroy(),下一轮我们处理数据将 触发错误。像这样:
java.io.IOException: org.apache.spark.SparkException: Attempted to use Broadcast(6) after it was destroyed (destroy at xxx.java:369)
我们查看了spark的源码,发现这个问题是由rdd依赖关系导致的。如果 rdd3 -> rdd2 -> rdd1 (箭头显示依赖关系)。 rdd1 使用名为 b1 的广播变量生成,rdd2 使用 b2。在生成 rdd3 时,源代码显示它需要序列化 b1 和 b2。如果 b1 或 b2 在 rdd3 生成过程之前被销毁。这将导致我在上面列出的失败。
问题:
有没有办法让rdd3忘记它的依赖,让它在生产过程中不需要b1、b2,只需要rdd2?
或者是否存在处理倾斜连接问题的方法?
顺便说一句,我们为每个回合设置了检查点。并将 spark.cleaner.ttl 设置为 600。问题仍然存在。如果我们不销毁广播变量,执行器将在第 5 回合丢失。
我们的代码是这样的:
for (int i = 0; i < times; i++) {
JavaPairRDD<Tuple2<String, String>, Double> prevItemPairRdd = curItemPairRdd;
List<Tuple2<String, Double>> itemSplit = itemZippedRdd
.filter(new FilterByHashFunction(times, i))
.collect();
Map<String, Double> itemSplitMap = new HashMap<String, Double>();
for (Tuple2<String, Double> item : itemSplit) {
itemSplitMap.put(item._1(), item._2());
}
Broadcast<Map<String, Double>> itemSplitBroadcast = jsc
.broadcast(itemSplitMap);
curItemPairRdd = prevItemPairRdd
.mapToPair(new NormalizeScoreFunction(itemSplitBroadcast))
.persist(StorageLevel.DISK_ONLY());
curItemPairRdd.count();
itemSplitBroadcast.destroy(true);
itemSplit.clear();
}
【问题讨论】:
-
所以你尝试加入两个 RDD,但没有成功,你决定自己重新实现加入。我质疑这个决定。为什么你认为你的 join 实现会比 Spark 的更好?我宁愿建议你解决加入的问题。将这些问题发布为 Stack Overflow 问题。
标签: apache-spark