【发布时间】:2017-01-20 03:00:21
【问题描述】:
我有 RDD[Row] :
|---itemId----|----Country-------|---Type----------|
| 11 | US | Movie |
| 11 | US | TV |
| 101 | France | Movie |
如何进行 GroupBy itemId 以便我可以将结果保存为 json 列表,其中每一行都是单独的 json 对象(RDD 中的每一行):
{"itemId" : 11,
"Country": {"US" :2 },"Type": {"Movie" :1 , "TV" : 1} },
{"itemId" : 101,
"Country": {"France" :1 },"Type": {"Movie" :1} }
RDD:
我试过了:
import com.mapping.data.model.MappingUtils
import com.mapping.data.model.CountryInfo
val mappingPath = "s3://.../"
val input = sc.textFile(mappingPath)
输入是 json 列表,其中每行是 json,我使用 MappingUtils 映射到 POJO 类 CountryInfo,它负责 JSON 解析和转换:
val MappingsList = input.map(x=> {
val countryInfo = MappingUtils.getCountryInfoString(x);
(countryInfo.getItemId(), countryInfo)
}).collectAsMap
MappingsList: scala.collection.Map[String,com.mapping.data.model.CountryInfo]
def showCountryInfo(x: Option[CountryInfo]) = x match {
case Some(s) => s
}
val events = sqlContext.sql( "select itemId EventList")
val itemList = events.map(row => {
val itemId = row.getAs[String](1);
val çountryInfo = showTitleInfo(MappingsList.get(itemId));
val country = if (countryInfo.getCountry() == 'unknown)' "US" else countryInfo.getCountry()
val type = countryInfo.getType()
Row(itemId, country, type)
})
有人可以告诉我如何实现这一目标吗?
谢谢!
【问题讨论】:
-
RDD[Row] 是否来自 DataFrame/DataSet?使用 RDD[Row] 虽然仍然可行,但通常并不理想。
-
我从数据集创建了 RDD。
-
@ASpotySpot 用我的 RDD 更新
标签: json scala apache-spark rdd spark-dataframe