【发布时间】:2017-02-17 01:27:15
【问题描述】:
我是 Spark 的新手,我正在努力解决以下 Spark 问题。我有一个包含大量记录的表。表包含 student_id、course_id、risk_date、first_name、last_name。根据业务场景,一个 student_id 和 course_id 可以有多个 risk_dates。所以我需要为特定的 student_id 和 course_id 获取最新的 risk_date 的 student_id、course_id、risk_date。
如果我在 SQL 查询中提到我的 scanario,它会是这样的
select student_id, course_id, max(risk_date) from
students group by student_id, course_id
我的 Scala 代码如下所示。
val sqlCaller = sparkSession.read.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")
.option("url", url)
.option("dbtable", "student_risk")
.option("user", "dmin")
.option("password", "admin123")
.load()
sqlCaller.cache();
val studentRDD = sqlCaller.rdd.map(r => (r.getString(r.fieldIndex("course_id")), r.getString(r.fieldIndex("student_id")), r.getTimestamp(r.fieldIndex("risk_date"))))
我可以使用过滤器来做到这一点吗?我不想使用 SQL 语句来获取符合我要求的数据。有人可以帮我做这件事吗?
【问题讨论】:
标签: scala apache-spark