【问题标题】:Converting csv RDD to map将 csv RDD 转换为地图
【发布时间】:2018-03-28 05:05:59
【问题描述】:

我有一个大的 CSV(> 500 MB),我将它放入一个 spark RDD,我想将它存储到一个大的 Map[String, Array[Long]] 中。 CSV 有多个列,但我暂时只需要两列。第一列和第二列,格式为:

A 12312 [some_value] ....
B 123123[some_value] ....
A 1222 [some_value] ....
C 1231 [some_value] ....

我希望我的地图基本上按字符串分组并存储一个长数组 因此,对于上述情况,我的地图将是: {“A”:[12312, 1222],“B”:123123,“C”:1231}

但是由于这张地图会很大,我不能简单地直接这样做。 tca

我在 sql.dataframe 中获取 CSV

到目前为止我的代码(虽然看起来不正确):

def getMap(df: sql.DataFrame, sc: SparkContext): RDD[Map[String, Array[Long]]] = {
    var records = sc.emptyRDD[Map[String, Array[Long]]]
    val rows: RDD[Row] =  df.rdd
    rows.foreachPartition( iter => {
      iter.foreach(x =>
        if(records.contains(x.get(0).toString)){
        val arr = temp_map.getOrElse()
          records = records + (x.get(0).toString -> (temp_map.getOrElse(x.get(0).toString) :+ x.get(1).toString.toLong))
      }
        else{
          val arr = new Array[Long](1)
          arr(0) = x.get(1).toString.toLong
          records = records + (x.get(0).toString -> arr)
        }



      )
    })

  }

提前致谢!

【问题讨论】:

  • 您是否要为每个键列 1 查找列 2 的列表?
  • 我基本上是在尝试按第 1 列分组并将结果存储在映射中,如果我将字符串作为键,它会给我一个与键对应的值数组(第 2 列值) CSV。

标签: csv apache-spark apache-spark-sql rdd


【解决方案1】:

如果我正确理解了你的问题,那么

您可以groupBy 第一列和collect_list 第二列column

import org.apache.spark.sql.functions._
val newDF = df.groupBy("column1").agg(collect_list("column2"))
newDF.show(faslse)

val rdd = newDF.rdd.map(r => (r.getString(0), r.getAs[List[Long]](1)))

这将为您提供RDD[(String, List[Long])],其中字符串将是唯一的

【讨论】:

  • 我正要发布相同的答案。 GroupBy 应该可以完成这项工作。
  • 我为这个问题写了代码,所以想把它贴出来:P:D
  • 谢谢,如何查询给定键 x 的 RDD[(String, List[Long])]?
  • 您要进行什么查询?你可以使用filter
  • 我基本上想要针对特定​​字符串键快速检索列表。这就是我寻找地图的原因
猜你喜欢
  • 1970-01-01
  • 2022-01-12
  • 1970-01-01
  • 1970-01-01
  • 2015-11-11
  • 2014-08-09
  • 2017-11-21
  • 2017-04-08
相关资源
最近更新 更多