【问题标题】:Getting an apache spark dataframe in the right format以正确的格式获取 apache spark 数据帧
【发布时间】:2016-07-01 10:46:13
【问题描述】:

我正在尝试将一些输入转换为我在 spark 数据框中想要的格式。 我的输入是这个案例类的序列,最多有 10,000,000 个类(或者在我将其转换为案例类之前也可能是 Json 字符串..):

case class Element(paramName: String, value: Int, time: Int)

因此,我想要一个这样的数据框:

|Time | ParamA | ParamB | ParamC | Param 10,000 |  
|1000 | 432432 | 8768768 | Null....... | 75675678622 |  
|2000 | Null.......| Null.........| 734543 | Null................. |  

....
因此,并非必须为所有时隙定义每个参数。缺失值应该用 Null 填充。并且可能会有 10,000 个参数和大约 1000 个时隙。

从效率上看,我现在的做法似乎很糟糕:

case class Elements(name: String, value: Int, time: Int)

case class GroupedObjects(time: Int, params: (String, Int)*)

 //elements contains the seq of Element
val elementsRdd: RDD[Elements] = sc.parallelize(elements)
val groupedRDD: RDD[GroupedObjects] = elementsRdd
  .groupBy(element => element.time)
  .map(tuple => GroupedObjects(tuple._1, tuple._2.map(element =>
    (element.name, element.value)).toSeq: _*))

//transforming back to json string to get right format for RDD
val jsonRDD: RDD[String] = groupedRDD.map { obj =>
  "{\"time\":" + obj.time + obj.params.map(tuple => 
     ",\"" + tuple._1 + "\":" + tuple._2).reduce(_ + _) + "}"
}
val df = sqlContext.read.json(jsonRDD).orderBy("time")
df.show(10)

我在这里看到的问题肯定是改回字符串,只是以正确的格式再次读取它。如果有任何帮助向我展示如何以所需的数据框格式获取输入案例类,我将非常高兴。
以我现在的方式,它真的很慢,而且我得到了 10,000,000 行输入行的堆大小异常。

【问题讨论】:

    标签: scala apache-spark dataframe apache-spark-sql rdd


    【解决方案1】:

    从 Spark 1.6 开始,有一个 pivot 函数。它适用于数据帧。由于您使用的是案例类,这很简单:

    val elementsRdd: RDD[Elements] = sc.parallelize(elements)
    val elementsDF = elementsRdd.toDF()
    

    你可以这样做:

    elementsDF.groupBy($"time").pivot(...)
    

    有关pivot() 的更多信息,请参阅GroupedData 的文档,但这应该足以让您继续。

    【讨论】:

    • 嗨,大卫,谢谢您的回复。我只是通过以下方式尝试了它:val dfTransformed = df.groupBy("time").pivot("name").sum("value") 总和只是为了取回一个数据框。这对于很多时间戳和少量参数(时间 1 到 1000000 和参数 2)非常有效,但是对于很多参数和较少的时间戳,这并不是很好。我的用例是 1000 个时间戳和 10,000 个不同的参数。例如,对于 100 个时间戳和 10,000 个,它需要永远并且比我的第一种方法更快地导致 java 堆空间异常。我做错了吗?
    • 例如,在我的第一种方法中,我可以使用 100 个时间戳处理 10,000 个参数,并且在将其设置为 1000 个时间戳时会出现堆异常。通过枢轴方法,我已经在 10,000 个参数和 100 个时间戳处获得了堆异常。
    • 我不认为你做错了什么。老实说,1.6 相对较新——另一个答案是动态构建行对象是我更熟悉的方式。你试过这样吗?由于这是新功能,因此您可能会将其推向超出其设计目的的范围。您可能想问那个具体的问题——为什么pivot 比另一种方法慢得多。话虽如此——我的理解是 Spark 是为长而细的数据设计的,而不是短而宽的数据。在 Spark 中拥有 1,000,000 列可能不会很有趣。
    • 如果你先求和怎么办:elemDF.groupBy($"time", $"name").agg(sum($"value") as "value").groupBy($"time").pivot("name").sum("value")
    【解决方案2】:

    您可能会尝试构建 Row 对象并手动定义 RDD 架构,类似于以下示例:

    // These extra imports will be required if you don't have them already
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
    
    //elements contains the seq of Element
    val elementsRdd = sc.parallelize(elements)
    
    val columnNames = elementsRdd.map(_.name).distinct().collect().sorted
    
    val pivoted = elementsRdd.groupBy(_.time).map {
      case (time, elemsByTime) =>
        val valuesByColumnName = elemsByTime.groupBy(_.name).map {
          case (name, elemsByTimeAndName) => (name, elemsByTimeAndName.map(_.value).sum)
        }
        val allValuesForRow = columnNames.map(valuesByColumnName.getOrElse(_, null))
        (time, allValuesForRow)
    }
    
    val schema = StructType(StructField("Time", IntegerType) :: columnNames.map(columnName => StructField(columnName, IntegerType, nullable = true)).toList)
    val rowRDD = pivoted.map(p => Row.fromSeq(p._1 :: p._2.toList))
    val df = sqlContext.createDataFrame(rowRDD, schema)
    df.show(10)
    

    我在本地尝试了 10,000,000 个这样的元素:

    val elements = (1 to 10000000).map(i => Element("Param" + (i % 1000).toString, i + 100, i % 10000))
    

    并且在合理的时间内成功完成。

    【讨论】:

      猜你喜欢
      • 2021-07-26
      • 2018-11-23
      • 1970-01-01
      • 1970-01-01
      • 2019-06-07
      • 1970-01-01
      • 2021-10-20
      • 1970-01-01
      • 2011-08-18
      相关资源
      最近更新 更多