【发布时间】:2017-09-06 19:51:33
【问题描述】:
我是 spark 的新手,有人可以帮我找到一种方法来组合两个 rdds 以根据 scala 中的以下逻辑创建最终 rdd,最好不使用 sqlcontext(dataframes) -
RDD1=column1,column2,column3 有 362825 条记录
RDD2=column2_distinct(与RDD1相同,但包含不同的值),column4有2621条记录
最终RDD=column1,column2,column3,column4
例子-
RDD1 =
userid | progid | Rating
a 001 5
b 001 3
b 002 4
c 003 2
RDD2=
progid(distinct) | id
001 1
002 2
003 3
最终 RDD=
userid | progid | id | rating
a 001 1 5
b 001 1 3
b 002 2 4
c 003 3 2
代码
val rawRdd1 = pairrdd1.map(x => x._1.split(",")(0) + "," + x._1.split(",")(1) + "," + x._2) //362825 records
val rawRdd2 = pairrdd2.map(x => x._1 + "," + x._2) //2621 records
val schemaString1 = "userid programid rating"
val schemaString2 = "programid id"
val fields1 = schemaString1.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))
val fields2 = schemaString2.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema1 = StructType(fields1)
val schema2 = StructType(fields2)
val rowRDD1 = rawRdd1.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1), attributes(2)))
val rowRDD2 = rawRdd2.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1)))
val DF1 = sparkSession.createDataFrame(rowRDD1, schema1)
val DF2 = sparkSession.createDataFrame(rowRDD2, schema2)
DF1.createOrReplaceTempView("df1")
DF2.createOrReplaceTempView("df2")
val resultDf = DF1.join(DF2, Seq("programid"))
val DF3 = sparkSession.sql("""SELECT df1.userid, df1.programid, df2.id, df1.rating FROM df1 JOIN df2 on df1.programid == df2.programid""")
println(DF1.count()) //362825 records
println(DF2.count()) //2621 records
println(DF3.count()) //only 297 records
期望与 DF1 具有相同数量的记录,并从 DF2 (id) 附加一个新列,该列具有来自 DF2 的相应 programid 值`
【问题讨论】:
-
您可以通过简单的连接来实现。
-
试过这样做,但加入没有给我正确的记录数
-
在问题中添加一些代码,以便我们检查逻辑。
-
如果你要使用数据集或数据框更好
-
@jamborta:在问题中添加了 sql 逻辑
标签: scala apache-spark