【发布时间】:2017-02-16 08:43:48
【问题描述】:
我有一个类似于((String, String), TimeStamp) 的 RDD。我有大量记录,我想为每个键选择 具有最新 TimeStamp 值的记录。我已经尝试了以下代码,但仍在努力解决这个问题。有人可以帮我做这个吗?
我尝试的以下代码是错误的,并且不能正常工作
val context = sparkSession.read.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")
.option("url", url)
.option("dbtable", "student_risk")
.option("user", "user")
.option("password", "password")
.load()
context.cache();
val studentRDD = context.rdd.map(r => ((r.getString(r.fieldIndex("course_id")), r.getString(r.fieldIndex("student_id"))), r.getTimestamp(r.fieldIndex("risk_date_time"))))
val filteredRDD = studentRDD.collect().map(z => (z._1, z._2)).reduce((x, y) => (x._2.compareTo(y._2)))
【问题讨论】:
-
z => (z._1, z._2)) 不正确。这块取而代之的是 course_id 和 student_id。您必须扩展第三个变量。我不擅长 scala,所以无法提供确切的代码。
-
顺便说一句 - 标题表明您正在寻找“最早的”,而文字说的是“最新的” - 我根据标题回答,显然这很容易改变。
-
使用该语言传达我的要求的小问题。我需要显示与每个键匹配的最早时间戳值的记录。
标签: scala apache-spark mapreduce