【发布时间】:2018-12-15 11:36:15
【问题描述】:
我有一个具有以下架构的 Spark Dataframe。
[{ "map": {
"completed-stages": 1,
"total-stages": 1 },
"rec": "test-plan",
"status": {
"state": "SUCCESS"
}
},
{ "map": {
"completed-stages": 1,
"total-stages": 1 },
"rec": "test-proc",
"status": {
"state": "FAILED"
}
}]
我想将其转换为具有以下架构的另一个 DF
[{"rec": "test-plan", "status": "SUCCESS"}, {"rec": "test-pROC", "status": "FAILED"}]
我编写了以下代码,但它无法编译并抱怨编码错误。
val fdf = DF.map(f => {
val listCommands = f.get(0).asInstanceOf[WrappedArray[Map[String, Any]]]
val m = listCommands.map(h => {
var rec = "none"
var status = "none"
if(h.exists("status" == "state" -> _)) {
status = (h.get("status") match {
case Some(x) => x.asInstanceOf[HashMap[String, String]].getOrElse("state", "none")
case _ => "none"
})
if(h.contains("rec")) {
rec = (h.get("rec") match {
case Some(x: String) => x
case _ => "none"
})
}
}
Map("status"->status, "rec"->rec)
})
val rm = m.flatten
rm
})
请提出正确的方法。
【问题讨论】:
标签: scala apache-spark apache-spark-sql