【问题标题】:What is happening when I call rdd.join(rdd)当我调用 rdd.join(rdd) 时发生了什么
【发布时间】:2018-06-07 20:48:28
【问题描述】:

我正在开发一个应用程序,我需要对 RDD 中具有相同键的每一对行执行计算,这里是 RDD 结构:

List<Tuple2<String, Tuple2<Integer, Integer>>> dat2 = new ArrayList<>();
dat2.add(new Tuple2<String, Tuple2<Integer, Integer>>("Alice", new Tuple2<Integer, Integer>(1, 1)));
dat2.add(new Tuple2<String, Tuple2<Integer, Integer>>("Alice", new Tuple2<Integer, Integer>(2, 5)));
dat2.add(new Tuple2<String, Tuple2<Integer, Integer>>("Alice", new Tuple2<Integer, Integer>(3, 78)));
dat2.add(new Tuple2<String, Tuple2<Integer, Integer>>("Bob", new Tuple2<Integer, Integer>(1, 6)));
dat2.add(new Tuple2<String, Tuple2<Integer, Integer>>("Bob", new Tuple2<Integer, Integer>(2, 11)));
JavaRDD<Tuple2<String, Tuple2<Integer, Integer>>> y2 = sc.parallelize(dat2);

现在,每个人的数据可以这样查看:(时间戳,值)。我想知道每一行在 +-1 时间戳中发生的值的数量。 (我知道这看起来像滑动窗口,但我想要事件级粒度

y2.join(y2);
resultOfJoin.filter(t -> t._2()._1()._1() - t._2()._2()._1() <= 1 && t._2()._1()._1() - t._2()._2()._1() >= -1)

在这种情况下,我找到的最佳解决方案是加入 RDD,为每个人创建 k^2 行,其中 k 是与此人关联的行数。

现在,我知道这是一场彻底的灾难。我知道这会导致洗牌(而且洗牌是不好的m'key),但我没有更好的办法。

我有 3 个问题:

  1. 由于我是在join之后立即过滤的,会不会影响join造成的压力(也就是说,会不会有优化)?
  2. 网络上传递的行数是多少? (我知道在最坏的情况下,结果 RDD 将有 n^2 行)在网络上发送的行将是 #workersn(仅发送一份副本并在 worker 上复制)还是 #workersn ^2(为结果工作器上的每 2 行组合发送行)?
  3. 如果我愿意与Dataset 合作,我可以加入过滤器。我了解数据集对计算图有额外的优化。如果我转移到数据集,我应该期望有多少改进(如果有的话)?

【问题讨论】:

    标签: java apache-spark join apache-spark-sql spark-dataframe


    【解决方案1】:

    由于我是在join之后进行过滤,会不会影响join造成的压力(也就是说,会不会有优化)?

    不,不会有优化。

    网络上传递的行数是多少?

    O(N)(特别是每条记录将被洗牌两次,每个父母一次)您通过键加入,因此每个项目都进入一个,并且只有一个分区。

    如果我愿意使用数据集,我可以加入过滤器。我了解数据集对计算图有额外的优化。如果我转换到数据集,我应该期望有多少改进(如果有的话)?

    Shuffle 过程得到了更好的优化,但除此之外,您不能指望任何针对特定情况的优化。

    希望知道每行在 +-1 时间戳中发生的值的数量。

    试试窗口函数:

    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.expressions._
    
    val w = Window.partitionBy("id").ordetBy("timestamp")
    
    rdd.toDF("id", "data")
      .select($"id", $"data._1" as "timestamp", $"data._2" as "value"))
      .withColumn("lead", lead($"value", 1).over(w))
      .withColumn("lag", lag($"value", 1).over(w))
    

    【讨论】:

    • 首先,感谢您的回答。我接受了你的回答,但有一点需要注意(我的 OP 中没有特别提到,这就是我接受的原因)。 Window、lag 和 Lead 让您查看有序列表中的下一个元素,这意味着在以下情况下此计算是错误的: 1. 并非所有时间戳都有数据 2. 窗口内可能存在几行(例如,希望聚合1 秒时间窗口和行中的所有数据都具有纳秒粒度)。
    猜你喜欢
    • 1970-01-01
    • 2016-11-11
    • 1970-01-01
    • 2022-11-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多