【问题标题】:Scala : How to do GroupBy sum for String values?Scala:如何对字符串值进行 GroupBy 求和?
【发布时间】:2017-01-20 03:00:21
【问题描述】:

我有 RDD[Row] :

  |---itemId----|----Country-------|---Type----------|
  |     11      |     US           |      Movie      | 
  |     11      |     US           |      TV         | 
  |     101     |     France       |      Movie      |     

如何进行 GroupBy itemId 以便我可以将结果保存为 json 列表,其中每一行都是单独的 json 对象(RDD 中的每一行):

{"itemId" : 11, 
"Country": {"US" :2 },"Type": {"Movie" :1 , "TV" : 1} },
{"itemId" : 101, 
"Country": {"France" :1 },"Type": {"Movie" :1} }

RDD:

我试过了:

import com.mapping.data.model.MappingUtils
import com.mapping.data.model.CountryInfo


val mappingPath = "s3://.../"    
val input = sc.textFile(mappingPath)

输入是 json 列表,其中每行是 json,我使用 MappingUtils 映射到 POJO 类 CountryInfo,它负责 JSON 解析和转换:

val MappingsList = input.map(x=> {
                    val countryInfo = MappingUtils.getCountryInfoString(x);
                    (countryInfo.getItemId(), countryInfo)
                 }).collectAsMap

MappingsList: scala.collection.Map[String,com.mapping.data.model.CountryInfo] 


def showCountryInfo(x: Option[CountryInfo]) = x match {
      case Some(s) => s
   }


val events = sqlContext.sql( "select itemId  EventList")

val itemList =  events.map(row => {
    val itemId = row.getAs[String](1);
    val çountryInfo =  showTitleInfo(MappingsList.get(itemId));
    val country = if (countryInfo.getCountry() == 'unknown)' "US" else countryInfo.getCountry()
    val type = countryInfo.getType()

    Row(itemId, country, type)
      })

有人可以告诉我如何实现这一目标吗?

谢谢!

【问题讨论】:

  • RDD[Row] 是否来自 DataFrame/DataSet?使用 RDD[Row] 虽然仍然可行,但通常并不理想。
  • 我从数据集创建了 RDD。
  • @ASpotySpot 用我的 RDD 更新

标签: json scala apache-spark rdd spark-dataframe


【解决方案1】:

我负担不起额外的时间来完成这个,但可以给你一个开始。

这个想法是您将RDD[Row] 聚合到一个表示您的 JSON 结构的 Map 中。聚合是一个折叠,需要两个函数参数:

  1. seqOp如何将元素集合折叠成目标类型
  2. combOp 如何合并两个目标类型。

在合并时,棘手的部分出现在combOp,因为您需要累积在seqOp 中看到的值的计数。我把它作为一个练习,因为我有一架飞机要赶!如果您遇到问题,希望其他人可以填补空白。

  case class Row(id: Int, country: String, tpe: String)

  def foo: Unit = {

    val rows: RDD[Row] = ???

    def seqOp(acc: Map[Int, (Map[String, Int], Map[String, Int])], r: Row) = {
      acc.get(r.id) match {
        case None => acc.updated(r.id, (Map(r.country, 1), Map(r.tpe, 1)))
        case Some((countries, types)) =>
          val countries_ = countries.updated(r.country, countries.getOrElse(r.country, 0) + 1)
          val types_ = types.updated(r.tpe, types.getOrElse(r.tpe, 0) + 1)
          acc.updated(r.id, (countries_, types_))
      }
    }

    val z = Map.empty[Int, (Map[String, Int], Map[String, Int])]

    def combOp(l: Map[Int, (Map[String, Int], Map[String, Int])], r: Map[Int, (Map[String, Int], Map[String, Int])]) = {
      l.foldLeft(z) { case (acc, (id, (countries, types))) =>
          r.get(id) match {
            case None => acc.updated(id, (countries, types))
            case Some(otherCountries, otherTypes) => 
              // todo - continue by merging countries with otherCountries
              // and types with otherTypes, then update acc
          }
      }
    }

    val summaryMap = rows.aggregate(z) { seqOp, combOp }

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2017-12-29
    • 2023-02-03
    • 1970-01-01
    • 2018-10-22
    • 2019-08-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多