【问题标题】:How to pick up the earliest timestamp date from the RDD in scala如何从scala中的RDD中获取最早的时间戳日期
【发布时间】:2017-02-16 08:43:48
【问题描述】:

我有一个类似于((String, String), TimeStamp) 的 RDD。我有大量记录,我想为每个键选择 具有最新 TimeStamp 值的记录。我已经尝试了以下代码,但仍在努力解决这个问题。有人可以帮我做这个吗?

我尝试的以下代码是错误的,并且不能正常工作

val context = sparkSession.read.format("jdbc")
  .option("driver", "com.mysql.jdbc.Driver")
  .option("url", url)
  .option("dbtable", "student_risk")
  .option("user", "user")
  .option("password", "password")
  .load()
context.cache();

val studentRDD = context.rdd.map(r => ((r.getString(r.fieldIndex("course_id")), r.getString(r.fieldIndex("student_id"))), r.getTimestamp(r.fieldIndex("risk_date_time"))))
val filteredRDD = studentRDD.collect().map(z => (z._1, z._2)).reduce((x, y) => (x._2.compareTo(y._2)))

【问题讨论】:

  • z => (z._1, z._2)) 不正确。这块取而代之的是 course_id 和 student_id。您必须扩展第三个变量。我不擅长 scala,所以无法提供确切的代码。
  • 顺便说一句 - 标题表明您正在寻找“最早的”,而文字说的是“最新的” - 我根据标题回答,显然这很容易改变。
  • 使用该语言传达我的要求的小问题。我需要显示与每个键匹配的最早时间戳值的记录。

标签: scala apache-spark mapreduce


【解决方案1】:

直接在DataFrame上很容易做到(这里奇怪地命名为context):

val result = context
  .groupBy("course_id", "student_id")
  .agg(min("risk_date_time") as "risk_date_time")

然后您可以像以前一样将其转换为 RDD(如果需要) - 结果具有相同的架构。

如果您确实想通过 RDD 执行此操作,请使用 reduceByKey

studentRDD.reduceByKey((t1, t2) => if (t1.before(t2)) t1 else t2)

【讨论】:

  • 在尝试使用您提到的第一个选项的数据框时会出现编译错误?
  • 什么编译失败?您可能需要添加 import org.apache.spark.sql.functions._ 才能在范围内获取 min 函数
  • 我要明确一件事,t1和t2是指RDD的两条记录,t1代表的是student_id和course_id的复合吗?
  • 因为我使用reduceByKey,它将每条记录视为一个(键,值)元组,并且reduce函数对进行操作 - 所以,t1和t2 是具有相同键的任意两条记录的(时间戳!)。所以对于每一个键,所有匹配的记录都会经过这个函数,直到得到的记录(值,时间戳类型)仍然存在
【解决方案2】:

首先,您的代码提供了不正确的结果,因为 reduce 不正确。 reduce 函数返回一个 int(来自 compareTo)而不是 x,y 对,但 int 没有 ._2 成员。 要更正此尝试:

  studentRDD.collect().map(z => (z._1, z._2)).reduce((x ,y) => if (x._2.compareTo(y._2) < 0) x else y)._1

基本上,这个新函数会返回时间更短的记录,然后返回整体结果(最小的)你取键。

请注意,由于收集,您在驱动程序上执行所有这些操作。没有理由在 RDD 上收集、映射和减少工作,因此您可以通过执行以下操作获得相同的结果(并且仍然是可扩展的): studentRDD.map(z => (z._1, z._2)).reduce((x ,y) => if (x._2.compareTo(y._2)

您可以直接从上下文数据框中执行此操作:

val targetRow = context.agg(min(struct('risk_date_time, 'course_id, 'student_id)) as "rec").select($"rec.*").collect()(0)
val key = (targetRow.getString(1), targetRow.getString(2))

【讨论】:

    猜你喜欢
    • 2021-11-14
    • 1970-01-01
    • 2023-03-22
    • 2021-10-03
    • 2011-11-11
    • 1970-01-01
    • 1970-01-01
    • 2021-06-19
    • 2018-02-13
    相关资源
    最近更新 更多