【发布时间】:2016-05-10 23:58:31
【问题描述】:
Spark 版本 1.2.1 Scala 版本 2.10.4
我有 2 个由数字字段关联的 SchemaRDD:
RDD 1: (Big table - about a million records)
[A,3]
[B,4]
[C,5]
[D,7]
[E,8]
RDD 2: (Small table < 100 records so using it as a Broadcast Variable)
[SUM, 2]
[WIN, 6]
[MOM, 7]
[DOM, 9]
[POM, 10]
Result
[C,5, WIN]
[D,7, MOM]
[E,8, DOM]
[E,8, POM]
我想要 RDD1 中的 max(field),它是
我正在尝试通过以下方式使用 Merge 来解决此问题:
按键对RDD进行排序(在一个组内排序不会超过100条记录在该组内。在上面的例子中是在一个组内)
执行类似于合并排序的合并操作。在这里,我还需要跟踪先前的值以找到最大值;我仍然只遍历列表一次。
由于这里有太多变量,我得到“任务不可序列化”异常。这种实现方法是否正确?我试图在这里避免笛卡尔积。有没有更好的方法?
添加代码 -
rdd1.groupBy(itm => (itm(2), itm(3))).mapValues( itmorg => {
val miorec = itmorg.toList.sortBy(_(1).toString)
for( r <- 0 to miorec.length) {
for ( q <- 0 to rdd2.value.length) {
if ( (miorec(r)(1).toString > rdd2.value(q).toString && miorec(r-1)(1).toString <= rdd2.value(q).toString && r > 0) || r == miorec.length)
org.apache.spark.sql.Row(miorec(r-1)(0),miorec(r-1)(1),miorec(r-1)(2),miorec(r-1)(3),rdd2.value(q))
}
}
}).collect.foreach(println)
【问题讨论】:
-
Since there are too may variables here I am getting "Task not serializable" exception.似乎该异常可能是由于变量过多的其他原因。可以发一下代码吗?
标签: scala apache-spark mergesort serializable