【问题标题】:spark rdd: grouping and filteringspark rdd:分组和过滤
【发布时间】:2019-07-16 01:18:05
【问题描述】:

我有一个对象的 Rdd“labResults”:

case class LabResult(patientID: String, date: Long, labName: String, value: String)

我想转换这个 rdd,使它只包含一个用于每个 patientID 和 labName 组合的行。此行应该是该患者 ID 和 labName 组合的最新行(我只对患者进行此实验室的最新日期感兴趣)。我是这样做的:

//group rows by patient and lab and take only the last one
val cleanLab = labResults.groupBy(x => (x.patientID, x.labName)).map(_._2).map { events =>
  val latest_date = events.maxBy(_.date)
  val lab = events.filter(x=> x.date == latest_date)
  lab.take(1)
}

我想从这个 RDD 创建边缘:

val edgePatientLab: RDD[Edge[EdgeProperty]] = cleanLab
  .map({ lab =>
    Edge(lab.patientID.toLong, lab2VertexId(lab.labName), PatientLabEdgeProperty(lab).asInstanceOf[EdgeProperty])
  })

我得到一个错误:

value patientID is not a member of Iterable[edu.gatech.cse6250.model.LabResult]

[错误] 边缘(lab.patientID.toLong,lab2VertexId(lab.labName),PatientLabEdgeProperty(lab).asInstanceOf[EdgeProperty]) [错误] ^ [错误] /hw4/stu_code/src/main/scala/edu/gatech/cse6250/graphconstruct/GraphLoader.scala:94:53:值labName不是Iterable的成员[edu.gatech.cse6250.model.LabResult] [错误] 边缘(lab.patientID.toLong,lab2VertexId(lab.labName),PatientLabEdgeProperty(lab).asInstanceOf[EdgeProperty]) [错误] ^ [错误] /hw4/stu_code/src/main/scala/edu/gatech/cse6250/graphconstruct/GraphLoader.scala:94:86:类型不匹配; [错误] 发现:Iterable[edu.gatech.cse6250.model.LabResult] [错误] 必需:edu.gatech.cse6250.model.LabResult [错误] Edge(lab.patientID.toLong, lab2VertexId(lab.labName), PatientLabEdgeProperty(lab).asInstanceOf[EdgeProperty])

所以,看起来问题是“cleanLab”也不是我预期的 LabResult 的 RDD,而是 Iterable[edu.gatech.cse6250.model.LabResult] 的 RDD [edu.gatech.cse6250.model.LabResult]

我该如何解决?

【问题讨论】:

  • 你可以用lab.head代替lab.take(1)

标签: scala apache-spark rdd


【解决方案1】:

这是我第一部分的方法。关于 Edge 和其他课程的内容我无法提供帮助,因为我不知道它们来自哪里(是来自 here 吗?)

scala> val ds = List(("1", 1, "A", "value 1"), ("1", 3, "A", "value 3"), ("1", 3, "B", "value 3"), ("1", 2, "A", "value 2"), ("1", 3, "B", "value 3"), ("1", 5, "B", "value 5") ).toDF("patientID", "date", "labName", "value").as[LabResult]
ds: org.apache.spark.sql.Dataset[LabResult] = [patientID: string, date: int ... 2 more fields]

scala> ds.show
+---------+----+-------+-------+
|patientID|date|labName|  value|
+---------+----+-------+-------+
|        1|   1|      A|value 1|
|        1|   3|      A|value 3|
|        1|   3|      B|value 3|
|        1|   2|      A|value 2|
|        1|   3|      B|value 3|
|        1|   5|      B|value 5|
+---------+----+-------+-------+


scala> val grouped = ds.groupBy("patientID", "labName").agg(max("date") as "date")
grouped: org.apache.spark.sql.DataFrame = [patientID: string, labName: string ... 1 more field]

scala> grouped.show
+---------+-------+----+
|patientID|labName|date|
+---------+-------+----+
|        1|      A|   3|
|        1|      B|   5|
+---------+-------+----+


scala> val cleanLab = ds.join(grouped, Seq("patientID", "labName", "date")).as[LabResult]
cleanLab: org.apache.spark.sql.Dataset[LabResult] = [patientID: string, labName: string ... 2 more fields]

scala> cleanLab.show
+---------+-------+----+-------+
|patientID|labName|date|  value|
+---------+-------+----+-------+
|        1|      A|   3|value 3|
|        1|      B|   5|value 5|
+---------+-------+----+-------+


scala> cleanLab.head
res45: LabResult = LabResult(1,3,A,value 3)

scala>

【讨论】:

    猜你喜欢
    • 2015-06-15
    • 2020-12-14
    • 2015-06-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-08-05
    • 1970-01-01
    • 2019-06-16
    相关资源
    最近更新 更多