【问题标题】:Create a Sequence of case class objects from a Spark DataFrame从 Spark DataFrame 创建一个案例类对象序列
【发布时间】:2020-08-11 21:29:14
【问题描述】:

如何遍历 Spark DataFrame 行并将它们添加到案例类对象序列中?

DF1:

val someDF = Seq(
  ("202003101750", "202003101700",122),
  ("202003101800", "202003101700",12),
  ("202003101750", "202003101700",42)
).toDF("number", "word","value")

案例分类:

case class ValuePerNumber(num:String, wrd:String, defaultID:Int, size: Long=0) {}

预期输出:

Seq(ValuePerNumber("202003101750", "202003101700",0, 122), ValuePerNumber("202003101800", "202003101700",0, 12), ValuePerNumber("202003101750", "202003101700",0, 42)) 

在每种情况下,我都可以将 defaultID 设为 0。 我不知道如何处理和解决这个问题,非常感谢任何解决方案/建议!

我尝试了以下方法:

val x = someDF.as[ValuePerNumber].collect()

我收到以下错误:

org.apache.spark.sql.AnalysisException: cannot resolve '`num`' given input columns: [number, word, value];

编辑:如果问题/解决方案对您有帮助,请投票,这反过来会在这个论坛中帮助我。

【问题讨论】:

  • 如果你想得到一个类的序列,你需要collect()驱动数据。另一种方法是创建一个Dataset[ValuePerNumber]
  • @Shaido-ReinstateMonica 我已经用我尝试过的内容以及我的使用方式编辑了我的帖子collect 似乎在这里不起作用。
  • 您的列名必须与案例类的列名匹配。
  • 重命名列?

标签: scala apache-spark apache-spark-sql scala-collections


【解决方案1】:
val someDF = Seq(
  ("202003101750", "202003101700",122),
  ("202003101800", "202003101700",12),
  ("202003101750", "202003101700",42)
).toDF("number", "word","value")

case class ValuePerNumber(number:String, word:String, defaultID:Int, value: Long)

someDF.withColumn("defaultId", lit(0)).as[ValuePerNumber].collect.toSeq

【讨论】:

    【解决方案2】:

    DataFrame 和 Case Class 中的列数和名称数应匹配以直接在 DataFrame 上使用 as[ValuePerNumber] 而无需提取值。

    1. size 在 DataFrame 中不可用,因此使用 withColumn 添加
    2. DF 和 Case 类中的列名不匹配。修改为匹配 DF 和案例类。
    scala> :paste
    // Entering paste mode (ctrl-D to finish)
    
    val someDF = Seq(("202003101750", "202003101700",122),("202003101800", "202003101700",12),("202003101750", "202003101700",42))
    .toDF("number", "word","value")
    .withColumn("size",lit(0)) // Added this to match your case class columns
    
    
    // Exiting paste mode, now interpreting.
    
    someDF: org.apache.spark.sql.DataFrame = [number: string, word: string ... 2 more fields]
    
    scala> case class ValuePerNumber(number:String, word:String, value:Int, size: Long=0) // Modified column names to match your dataframe column names.
    defined class ValuePerNumber
    
    scala> someDF.as[ValuePerNumber].show(false)
    +------------+------------+-----+----+
    |number      |word        |value|size|
    +------------+------------+-----+----+
    |202003101750|202003101700|122  |0   |
    |202003101800|202003101700|12   |0   |
    |202003101750|202003101700|42   |0   |
    +------------+------------+-----+----+
    
    
    scala>
    

    【讨论】:

      【解决方案3】:

      您可以将Dataset[ValuePeerNumber]collect 创建为Seq

      val someDF = Seq(
        ("202003101750", "202003101700",122),
        ("202003101800", "202003101700",12),
        ("202003101750", "202003101700",42)
      ).toDF("number", "word","value")
      
      val result = someDF.map(r => ValuePerNumber(r.getAs[String](0), r.getAs[String](1), r.getAs[Int](2))).collect().toSeq
      

      您还可以在数据框中添加列并编辑列名以匹配您可以直接执行的案例类

      val x = someDF.as[ValuePerNumber].collect()
      

      【讨论】:

      • 感谢我正在寻找的东西!!!更准确地说:val result = someDF.map(r => ValuePerNumber(r.getAs[String](0), r.getAs[String](1), 0, r.getAs[Int](2))).collect().toSeq
      猜你喜欢
      • 2018-08-03
      • 1970-01-01
      • 2019-07-08
      • 2020-01-31
      • 1970-01-01
      • 2017-05-17
      • 1970-01-01
      • 2017-12-13
      • 1970-01-01
      相关资源
      最近更新 更多