【问题标题】:Combine two rdds合并两个rdds
【发布时间】:2017-09-06 19:51:33
【问题描述】:

我是 spark 的新手,有人可以帮我找到一种方法来组合两个 rdds 以根据 scala 中的以下逻辑创建最终 rdd,最好不使用 sqlcontext(dataframes) -

RDD1=column1,column2,column3 有 362825 条记录

RDD2=column2_distinct(与RDD1相同,但包含不同的值),column4有2621条记录

最终RDD=column1,column2,column3,column4

例子-

RDD1 =

  userid |  progid  |  Rating
       a       001     5
       b       001     3
       b       002     4
       c       003     2

RDD2=

   progid(distinct) |   id

   001                  1
   002                  2
   003                  3

最终 RDD=

    userid  | progid  | id | rating
        a       001      1   5
        b       001      1   3
        b       002      2   4
        c       003      3   2

代码

val rawRdd1 = pairrdd1.map(x => x._1.split(",")(0) + "," + x._1.split(",")(1) + "," + x._2) //362825 records    

val rawRdd2 = pairrdd2.map(x => x._1 + "," + x._2) //2621 records    

val schemaString1 = "userid programid rating"    

val schemaString2 = "programid id"    

val fields1 = schemaString1.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))    

val fields2 = schemaString2.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))    

val schema1 = StructType(fields1)    
val schema2 = StructType(fields2)    


val rowRDD1 = rawRdd1.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1), attributes(2)))    

val rowRDD2 = rawRdd2.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1)))    

val DF1 = sparkSession.createDataFrame(rowRDD1, schema1)    

val DF2 = sparkSession.createDataFrame(rowRDD2, schema2)    


DF1.createOrReplaceTempView("df1")    

DF2.createOrReplaceTempView("df2")    


val resultDf = DF1.join(DF2, Seq("programid"))    

val DF3 = sparkSession.sql("""SELECT df1.userid, df1.programid, df2.id, df1.rating FROM  df1 JOIN df2 on df1.programid == df2.programid""")    


println(DF1.count())  //362825 records    

println(DF2.count())  //2621 records    

println(DF3.count())  //only 297 records

期望与 DF1 具有相同数量的记录,并从 DF2 (id) 附加一个新列,该列具有来自 DF2 的相应 programid 值`

【问题讨论】:

  • 您可以通过简单的连接来实现。
  • 试过这样做,但加入没有给我正确的记录数
  • 在问题中添加一些代码,以便我们检查逻辑。
  • 如果你要使用数据集或数据框更好
  • @jamborta:在问题中添加了 sql 逻辑

标签: scala apache-spark


【解决方案1】:

它有点难看,但应该可以工作(Spark 2.0):

 val rdd1 = sparkSession.sparkContext.parallelize(List("a,001,5", "b,001,3", "b,002,4","c,003,2"))
 val rdd2 = sparkSession.sparkContext.parallelize(List("001,1", "002,2", "003,3"))

 val groupedRDD1 = rdd1.map(x => (x.split(",")(1),x))
 val groupedRDD2 = rdd2.map(x => (x.split(",")(0),x))
 val joinRDD = groupedRDD1.join(groupedRDD2)
 // convert back to String
 val cleanJoinRDD = joinRDD.map(x => x._1 + "," + x._2._1.replace(x._1 + ",","") + "," + x._2._2.replace(x._1 + ",",""))
 cleanJoinRDD.collect().foreach(println)

我认为更好的选择是使用 spark SQL

【讨论】:

  • 谢谢,我试过了,但在最终的 rdd 中没有得到正确的记录数-val groupedRDD1 = rdd1.map(x => (x.split(",")(1),x)) // 362825 records ` val groupedRDD2 = rdd2.map(x => (x.split(",") (0),x)) // 2621 条记录(因为只包含键列的唯一值)`val cleanJoinRDD = joinRDD.map(x => x._1 + "," + x._2._1.replace(x._1 + ",","") + "," + x._2._2.replace(x._1 + ",","")) //297 records 而我期望 cleanJoinRdd 具有与 rdd1 相同数量的记录,即 362825 具有相应的值(来自键rdd2 )作为新列。
  • 为什么你不能使用数据帧/数据集?
  • 试过用那个..用代码更新了问题。仍然没有得到想要的结果。
【解决方案2】:

首先,为什么要再次拆分、连接和拆分行?您可以一步完成:

val rowRdd1 = pairrdd1.map{x => 
    val (userid, progid) = x._1.split(",") 
    val rating = x._2
    Row(userid, progid, rating) 
}

我猜你的问题可能是你的键中有一些额外的字符,所以在连接中不匹配。一种简单的方法是执行left join 并检查它们不匹配的行。

这可能是行中的额外空间,您可以像这样为两个 rdds 修复:

val rowRdd1 = pairrdd1.map{x =>  
    val (userid, progid) = x._1.split(",").map(_.trim)
    val rating = x._2
    Row(userid, progid, rating) 
}

【讨论】:

  • 我的最终目标是将最终的 rdd 输入到 org.apache.spark.mllib.recommendation.Rating 方法中,但是由于该方法只接受整数作为产品并且我的值长达 17 位,这就是为什么我必须映射它到“id”列(向下转换)之类的内容,如果您觉得有更好的方法在 ALS 的评级方法中提供长类型,请告诉我。
  • @Amit 我的主要观点是关于用于连接的键中的额外字符。继续就如何提供Rating 方法提出一个单独的问题。
猜你喜欢
  • 2015-06-04
  • 1970-01-01
  • 2017-03-31
  • 2019-05-07
  • 1970-01-01
  • 2017-04-08
  • 2015-06-29
  • 2016-03-25
  • 2018-07-11
相关资源
最近更新 更多