【问题标题】:Spark - java.lang.ClassCastException when processing into a udf a column of type Array[Array[Map[String,String]]]Spark - 将 Array[Array[Map[String,String]]] 类型的列处理成 udf 时出现 java.lang.ClassCastException
【发布时间】:2021-04-02 21:51:44
【问题描述】:

我在 Array[Map[String,String]] 类型的 spark 中连接两列,从而生成 Array[Array[Map[String,String]]] 类型的新列。但是,我想将该列展平,最终得到一个 Array[Map[String,String]] 类型的列,其中包含两个原始列的值

我从 Spark 2.4 中了解到,可以将flatten 直接应用于列的串联。像这样的:

df.withColumn("concatenation", flatten(array($"colArrayMap1", $"colArrayMap2")))

但是我仍然使用 Spark 2.2,所以我需要为此使用 udf。这是我写的:

def flatten_collection(arr: Array[Array[Map[String,String]]]) = {
    if(arr == null)
        null
    else
        arr.flatten
}
  
val flatten_collection_udf = udf(flatten_collection _)

df.withColumn("concatenation", array($"colArrayMap1", $"colArrayMap2")).withColumn("concatenation", flatten_collection_udf($"concatenation")).show(false)

但我收到以下错误:

Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (array<array<map<string,string>>>) => array<map<string,string>>)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:835)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:835)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  at org.apache.spark.scheduler.Task.run(Task.scala:109)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:380)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [[Lscala.collection.immutable.Map;

我假设 udf 中发生了强制转换错误,但为什么以及如何避免它?

此外,如果有人知道 Spark 2.2 的解决方案,它不需要更好地使用 UDF

【问题讨论】:

    标签: scala apache-spark apache-spark-sql user-defined-functions


    【解决方案1】:

    改编自答案here。需要Seq 而不是Array

    def concat_arr(
        arr1: Seq[Map[String,String]],
        arr2: Seq[Map[String,String]]
    ) : Seq[Map[String,String]] =
    {
        (arr1 ++ arr2)
    }
    val concatUDF = udf(concat_arr _)
    
    val df2 = df.withColumn("concatenation", concatUDF($"colArrayMap1", $"colArrayMap2"))
    
    df2.show(false)
    +--------------------+--------------------+----------------------------------------+
    |colArrayMap1        |colArrayMap2        |concatenation                           |
    +--------------------+--------------------+----------------------------------------+
    |[[a -> b], [c -> d]]|[[a -> b], [c -> d]]|[[a -> b], [c -> d], [a -> b], [c -> d]]|
    +--------------------+--------------------+----------------------------------------+
    

    【讨论】:

    • Regular Array 是一个 Java 数组,它实际上并不支持 Seq 所做的所有事情。为了做到这一点,有一个到WrappedArray的隐式转换,但这不会在Spark内部自动发生,你必须通过声明它来强制它Seq
    猜你喜欢
    • 2017-01-29
    • 2019-03-30
    • 1970-01-01
    • 2022-07-22
    • 1970-01-01
    • 2019-03-22
    • 1970-01-01
    • 1970-01-01
    • 2018-08-16
    相关资源
    最近更新 更多