【问题标题】:Removing parenthesis after joining RDDs加入 RDD 后去掉括号
【发布时间】:2019-03-13 23:14:57
【问题描述】:

我正在加入大量 rdd,我想知道是否有一种通用的方法可以删除每次加入时创建的括号。

这是一个小样本:

val rdd1 =  sc.parallelize(Array((1,2),(2,4),(3,6)))
val rdd2 =  sc.parallelize(Array((1,7),(2,8),(3,6)))
val rdd3 =  sc.parallelize(Array((1,2),(2,4),(3,6)))

val result = rdd1.join(rdd2).join(rdd3)

res: result: org.apache.spark.rdd.RDD[(Int, ((Int, Int), Int))] = Array((1,((2,7),2)), (3,((4,8),4)), (3,((4,8),6)), (3,((4,6),4)), (3,((4,6),6)))

我知道我可以使用地图

result.map((x) => (x._1,(x._2._1._1,x._2._1._2,x._2._2))).collect

Array[(Int, (Int, Int, Int))] = Array((1,(2,7,2)), (2,(4,8,4)), (3,(6,6,6)))

但是对于大量的rdd,每个都包含许多元素,很快就会很难使用这种方法

【问题讨论】:

    标签: scala apache-spark rdd


    【解决方案1】:

    对于大量 rdd,每个都包含许多元素,这种方法根本行不通,因为最大的内置元组仍然是 Tuple22。如果你加入同构 RDD 某种类型的序列:

    def joinAndMerge(rdd1: RDD[(Int, Seq[Int])], rdd2: RDD[(Int, Seq[Int])]) = 
      rdd1.join(rdd2).mapValues{ case (x, y) => x ++ y }
    
    Seq(rdd1, rdd2, rdd3).map(_.mapValues(Seq(_))).reduce(joinAndMerge)
    

    如果你只有三个 RDD,使用cogroup 会更简洁:

    rdd1.cogroup(rdd2, rdd3)
      .flatMapValues { case (xs, ys, zs) => for {
        x <- xs; y <- ys; z <- zs 
      } yield (x, y, z) }
    

    如果值是异构的,使用DataFrames 更有意义:

    def joinByKey(df1: DataFrame, df2: DataFrame) = df1.join(df2, Seq("k"))
    
    Seq(rdd1, rdd2, rdd3).map(_.toDF("k", "v")).reduce(joinByKey)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-12-13
      • 1970-01-01
      • 2012-10-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多