【问题标题】:Spark: How to access elements when type GenericRowWithSchemaSpark:输入 GenericRowWithSchema 时如何访问元素
【发布时间】:2021-08-29 07:57:41
【问题描述】:

我正在尝试解析 JSON 数组,但出现以下错误。有什么建议吗?

import org.apache.spark.sql.{Dataset, Row, SparkSession}
import scala.collection.mutable

case class MyData(c1: String, c2: String)

object ParseJSON extends App {
  val sparkSess = SparkSession.builder().master("local[2]").getOrCreate()
  val sc = sparkSess.sparkContext

  val dfJSON: Dataset[Row] = sparkSess.read.format("json").option("multiLine", true).option("inferSchema", true).load("C:/test.json")

  dfJSON.printSchema()

  import sparkSess.implicits._
  val dfParsed = dfJSON.flatMap(rec => {
    val header = rec.getAs("Subject").asInstanceOf[Row].asInstanceOf[Row].getAs("Header").asInstanceOf[mutable.WrappedArray[Seq[Row]]]
    header.flatten.foreach(t => {
      println(t.get(0) + " " + t.get(1))
    })
    Seq(MyData("", ""))
  })
  dfParsed.show(5000, false)
}

test.json

{ “主题”:{ “标题”:[ { “单位”:“k1”, “标签”:“l1”},{ “单位”:“k2”, “标签”:“l2”},{ “单位”:“k3”, “标签”:“l3”}]}}

21/06/13 01:51:47 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2)
java.lang.ClassCastException: class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to class scala.collection.GenTraversableOnce (org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema and scala.collection.GenTraversableOnce are in unnamed module of loader 'app')
    at scala.collection.generic.GenericTraversableTemplate$$anonfun$flatten$1.apply(GenericTraversableTemplate.scala:172)
    at scala.collection.generic.GenericTraversableTemplate$$anonfun$flatten$1.apply(GenericTraversableTemplate.scala:171)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
    at scala.collection.generic.GenericTraversableTemplate$class.flatten(GenericTraversableTemplate.scala:171)
    at scala.collection.AbstractTraversable.flatten(Traversable.scala:104)
    at ParseJSON$$anonfun$2.apply(ParseJSON.scala:32)
    at ParseJSON$$anonfun$2.apply(ParseJSON.scala:25)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:645)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:265)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:257)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1289)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
    at java.base/java.lang.Thread.run(Thread.java:832)

【问题讨论】:

    标签: json apache-spark


    【解决方案1】:

    我假设您想要的输出是 dataFrameunitlabel

    df
      .withColumn("newHeader", explode(col("Subject.Header")))
      .select(col("newHeader.label").as("label"), col("newHeader.units").as("units"))
    .show(false)
    
    
    +-----+-----+
    |label|units|
    +-----+-----+
    |l1   |k1   |
    |l2   |k2   |
    |l3   |k3   |
    +-----+-----+
    

    另一种方法是将它传递给 UDF 并在 udf 内部:

    header
    .map(element => element.getAs[String]("units"))
    .toList
    

    【讨论】:

      【解决方案2】:

      该问题已通过转换更改得到解决,请参见下文。

      来自

      val header = rec.getAs("Subject").asInstanceOf[Row].asInstanceOf[Row].getAs("Header").asInstanceOf[mutable.WrappedArray[Seq[Row]]]
      

      val header = rec.getAs("Subject").asInstanceOf[Row].asInstanceOf[Row].getAs("Header").asInstanceOf[mutable.WrappedArray[Row]]
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2015-06-16
        • 1970-01-01
        • 2017-01-26
        • 2019-10-10
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多