【问题标题】:spark shuffle write is super slowspark shuffle write 超级慢
【发布时间】:2018-06-13 02:53:57
【问题描述】:

为什么 1.6MB 随机写入和 2.4MB 输入的 spark shuffle 阶段如此缓慢?还有为什么随机写入只发生在一个执行程序上?我正在运行一个 3 节点集群,每个集群有 8 个核心。

火花用户界面:

代码:

*JavaPairRDD<String, String> javaPairRDD = c.mapToPair(new PairFunction<String, String, String>() {
    @Override
    public Tuple2<String, String> call(String arg0) throws Exception {
        // TODO Auto-generated method stub

        try {
            if (org.apache.commons.lang.StringUtils.isEmpty(arg0)) {
                return new Tuple2<String, String>("", "");
            }
            Tuple2<String, String> t = new Tuple2<String, String>(getESIndexName(arg0), arg0);
            return t;
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("******* exception in getESIndexName");
        }
        return new Tuple2<String, String>("", "");
    }
});

java.util.Map<String, Iterable<String>> map1 = javaPairRDD.groupByKey().collectAsMap();* 

【问题讨论】:

    标签: apache-spark apache-spark-sql


    【解决方案1】:

    为什么shuffle write只发生在一个executor上:

    请检查您的 RDD 分区,以下 UI 图像帮助您查找

    我认为您的 RDD 只有一个分区,而不是 8 个或更多,最终会利用所有执行程序。

    rdd = rdd.repartition(8) 
    

    Avoiding Shuffle "Less stage, run faster

    混洗是跨分区重新分配数据的过程(也称为重新分区),它可能会也可能不会导致跨 JVM 进程甚至通过网络(不同机器上的执行程序之间)移动数据。

    默认情况下,洗牌不会改变分区的数量,因为你只有一个分区,它看起来很慢。

    如何避免洗牌:

    • 当两个 RDD 都有重复的键时,join 会导致数据的大小急剧膨胀。执行 distinct 或 combineByKey 操作以减少键空间或使用 cogroup 处理重复键而不是产生完整的叉积可能会更好。通过在合并步骤中使用智能分区,可以防止连接中的第二次混洗(我们将在后面详细讨论)。

    • 如果两个 RDD 中都不存在密钥,则可能会意外丢失数据。使用外连接会更安全,这样可以保证将所有数据保留在左或右 RDD 中,然后在连接后过滤数据。

    • 1234563 .
    • 为了连接数据,Spark 需要将要连接的数据(即基于每个键的数据)存在于同一个分区上。 Spark 中连接的默认实现是随机散列连接。 shuffled hash join 确保每个分区上的数据将包含相同的键,方法是使用与第一个相同的默认分区器对第二个数据集进行分区,以便来自两个数据集的具有相同哈希值的键位于同一分区中。虽然这种方法总是有效的,但它可能比必要的成本更高,因为它需要洗牌。如果满足以下条件,则可以避免洗牌:

      1.两个 RDD 都有一个已知的分区器。

      1. 其中一个数据集小到足以放入内存,在这种情况下,我们可以进行广播哈希连接(稍后我们将解释这是什么)。

    请注意,如果 RDD 位于同一位置,则网络传输可以 避免,随着洗牌。重新分区后始终坚持

    • DataFrame Joins 在 DataFrame 之间连接数据是最常见的多 DataFrame 转换之一。标准 SQL 连接类型均受支持,可在执行连接时指定为 df.join(otherDf, sqlCondition, joinType) 中的 joinType。与 RDD 之间的连接一样,使用非唯一键连接将产生叉积(因此,如果左表有 R1 和 R2 和 key1,右表有 R3 和 R5 和 key1,你将得到 (R1, R3), (R1, R5), (R2, R3), (R2, R5)) 在输出中。

    • 使用自联接和 lit(true),您可以生成数据集的笛卡尔积,这很有用,但也说明了联接(尤其是自联接)如何很容易导致无法使用的数据大小。

    • 使用广播连接和广播连接,可以非常有效地连接大表(事实)和相对较小的表(维度),避免通过网络发送大表的所有数据。可以使用广播功能在连接运算符中使用时标记要广播的数据集。它使用 spark.sql.autoBroadcastJoinThreshold 设置来控制执行连接时将广播到所有工作节点的表的大小。

    • 使用相同的分区程序。如果两个 RDD 有相同的 partitioner,join 不会导致 shuffle。但是请注意,缺少 shuffle 并不意味着不必在节点之间移动数据。两个 RDD 可能具有相同的分区器(共同分区),但相应的分区位于不同的节点上(不位于同一位置)。这种情况仍然比洗牌要好,但要记住这一点。托管可以提高性能,但很难保证。

    • 如果数据量很大和/或您的集群无法增长甚至导致 OOM,请使用两遍方法。首先,重新分区数据并使用分区表(dataframe.write.partitionBy())进行持久化。然后,在一个循环中串行连接子分区,“附加”到同一个最终结果表。

    • https://www.slideshare.net/cloudera/top-5-mistakes-to-avoid-when-writing-apache-spark-applications

    • https://medium.com/@foundev/you-won-t-believe-how-spark-shuffling-will-probably-bite-you-also-windowing-e39d07bf754e
    • https://blog.cloudera.com/blog/2015/01/improving-sort-performance-in-apache-spark-its-a-double/ -https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-rdd-shuffle.html

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-06-19
      • 2021-09-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-03-07
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多