【问题标题】:Scala--How to get the same part of the two RDDS?Scala——如何获得两个RDDS的相同部分?
【发布时间】:2018-07-11 08:11:07
【问题描述】:

有两个RDD:

val rdd1 = sc.parallelize(List(("aaa", 1), ("bbb", 4), ("ccc", 3)))
val rdd2 = sc.parallelize(List(("aaa", 2),  ("bbb", 5), ("ddd", 2))) 

如果我想通过第一个字段加入这些并得到如下结果:

List(("aaa", 1,2), ("bbb",4 ,5))

我应该编码什么?谢谢!!!!

【问题讨论】:

    标签: scala join rdd


    【解决方案1】:

    您可以join RDD 和map 将结果转换为所需的数据结构:

    val resultRDD = rdd1.join(rdd2).map{
      case (k: String, (v1: Int, v2: Int)) => (k, v1, v2)
    }
    // resultRDD: org.apache.spark.rdd.RDD[(String, Int, Int)] = MapPartitionsRDD[53] at map at <console>:32
    
    resultRDD.collect
    // res1: Array[(String, Int, Int)] = Array((aaa,1,2), (bbb,4,5))
    

    【讨论】:

      【解决方案2】:

      作为RDD[(String, Int)] 类型的RDDs,您可以简单地使用join 加入这两个RDDs,您将得到RDD[(String, (Int, Int))]。现在你想List[(String, (Int, Int))]你需要收集加入的RDD(如果加入的RDD很大,不推荐)并将其转换为List。试试下面的代码,

      val rdd1: RDD[(String, Int)] = sc.parallelize(List(("aaa", 1), ("bbb", 4), ("ccc", 3)))
      val rdd2: RDD[(String, Int)] = sc.parallelize(List(("aaa", 2), ("bbb", 5), ("ddd", 2)))
      
      //simply join two RDDs
      val joinedRdd: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
      
      //only if you want List then collect it (It is not recommended for huge RDDs)
      val lst: List[(String, (Int, Int))] = joinedRdd.collect().toList
      
      println(lst)
      
      //output
      //List((bbb,(4,5)), (aaa,(1,2)))
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2017-03-31
        • 2015-06-04
        • 2018-05-26
        • 1970-01-01
        • 2017-09-06
        • 2017-04-08
        • 1970-01-01
        • 2015-11-30
        相关资源
        最近更新 更多