【发布时间】:2016-07-01 10:46:13
【问题描述】:
我正在尝试将一些输入转换为我在 spark 数据框中想要的格式。 我的输入是这个案例类的序列,最多有 10,000,000 个类(或者在我将其转换为案例类之前也可能是 Json 字符串..):
case class Element(paramName: String, value: Int, time: Int)
因此,我想要一个这样的数据框:
|Time | ParamA | ParamB | ParamC | Param 10,000 |
|1000 | 432432 | 8768768 | Null....... | 75675678622 |
|2000 | Null.......| Null.........| 734543 | Null................. |
....
因此,并非必须为所有时隙定义每个参数。缺失值应该用 Null 填充。并且可能会有 10,000 个参数和大约 1000 个时隙。
从效率上看,我现在的做法似乎很糟糕:
case class Elements(name: String, value: Int, time: Int)
case class GroupedObjects(time: Int, params: (String, Int)*)
//elements contains the seq of Element
val elementsRdd: RDD[Elements] = sc.parallelize(elements)
val groupedRDD: RDD[GroupedObjects] = elementsRdd
.groupBy(element => element.time)
.map(tuple => GroupedObjects(tuple._1, tuple._2.map(element =>
(element.name, element.value)).toSeq: _*))
//transforming back to json string to get right format for RDD
val jsonRDD: RDD[String] = groupedRDD.map { obj =>
"{\"time\":" + obj.time + obj.params.map(tuple =>
",\"" + tuple._1 + "\":" + tuple._2).reduce(_ + _) + "}"
}
val df = sqlContext.read.json(jsonRDD).orderBy("time")
df.show(10)
我在这里看到的问题肯定是改回字符串,只是以正确的格式再次读取它。如果有任何帮助向我展示如何以所需的数据框格式获取输入案例类,我将非常高兴。
以我现在的方式,它真的很慢,而且我得到了 10,000,000 行输入行的堆大小异常。
【问题讨论】:
标签: scala apache-spark dataframe apache-spark-sql rdd