【问题标题】:Selecting the Earliest and Latest Dates from a Grouped RDD从分组的 RDD 中选择最早和最晚的日期
【发布时间】:2017-02-18 09:15:31
【问题描述】:

我有一个形式为 (patientID, [Medication]) 的分组 RDD,其中 Medication 是以下案例类:

case class Medication(patientID: String, date: Date, medicine: String)

RDD 由以下行组成:

val grpMeds = medication.groupBy(_.patientID)

其中药物是 RDD[药物] 形式的 RDD。

对于每个患者,我都在尝试找出一种特定药物“medicine_A”的最早和最晚日期(请注意,药物是case class Medication 的一种方法)。我要获取的是格式为 RDD[patientID, earlyDate, latestDate] 的 RDD,但不知道如何获取。

任何帮助将不胜感激。数据(从grpMeds.take(0).foreach(println) 获得)的示例如下所示。

Medication(000961291-01,Tue Jun 21 19:45:00 UTC 2005,Isotonic Saline (0.9%))
Medication(000096430-01,Mon Nov 15 20:45:00 UTC 2010,insulin aspart)

【问题讨论】:

  • 那么使用minmax 有什么问题呢?样本数据和预期结果会有所帮助...
  • Date 是一个 java.utils.Date 函数。我不相信它有最小/最大方法,但我可以使用 date1.before(date2)。添加了 grpMeds.take(0).foreach(println) 返回的示例。

标签: sql scala apache-spark


【解决方案1】:

使用groupBy 是一种非常低效的方法。作为替代品,我建议使用 Spark SQL 或 reduceByKey

对于 Spark SQL,您应该将 medication 转换为 DataFrame

import spark.implicits._  // import sqlContext.implicits._

val medicationDF = medication.toDF

并使用groupBy,后跟agg

medicationDF.groupBy($"patientID", $"medicine").agg(min($"date"), max($"date"))

对于此解决方案,date 应为 java.sql.Datejava.sql.Timestamp

对于reduceByKey,首先你应该重塑medication 以获得由patientIdmedicine 组成的键和重复的date 的值:

val medicationPairs = medication.map(m => 
  ((m.patientID, m.medicine), (m.date, m.date))
)

下一个reduceByKey:

medicationPairs.reduceByKey { 
  case ((xMin, xMax), (yMin, yMax)) => (
    if(xMin.before(yMin)) xMin else yMin,
    if(xMax.after(yMax))  xMax else yMax
  )
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-09-29
    • 2017-04-30
    • 2021-06-29
    • 1970-01-01
    • 1970-01-01
    • 2010-10-21
    • 2021-08-31
    • 2021-06-19
    相关资源
    最近更新 更多