【问题标题】:Mapping simple values from a Map to a spark DataFrame error将简单值从 Map 映射到 spark DataFrame 错误
【发布时间】:2020-04-10 11:00:19
【问题描述】:

我最近开始在 Scala 中使用 Spark,我发现自己想要将一些值从 hashmap/map 映射到数据帧,而不必构造新的数据帧然后执行某种连接.

我有这个数据框:

+---+-------+---+----------+---------+
| id|   name|age|      date|genderKey|
+---+-------+---+----------+---------+
|  1|Rodrigo| 30|2019-01-01|     male|
|  2|Roberto| 23|2019-01-01|     male|
|  3|Roberto| 25|2019-01-01|     male|
|  4|Rodrigo| 30|2019-01-01|     male|
|  5|Mariana| 32|2019-01-01|   female|
+---+-------+---+----------+---------+

还有这个 Map 结构:

var genderMap = Map[String, String](
    "male" -> "Masculine",
    "female" -> "Feminine"
)

我想通过用户定义的函数将一列添加到具有映射值的数据框:

val getGenderName = udf((gender:String)=>genderMap(person))

dfPeople
    .withColumn("genderName", getGenderName(col("genderKey")))
    .show()

当我执行显示操作时出现此错误:

sqlfunc: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(IntegerType)))
getGender: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
java.lang.InternalError: Malformed class name
  at java.lang.Class.getSimpleBinaryName(Class.java:1450)
  at java.lang.Class.getSimpleName(Class.java:1309)
  at org.apache.spark.sql.catalyst.expressions.ScalaUDF.udfErrorMessage$lzycompute(ScalaUDF.scala:1048)
  at org.apache.spark.sql.catalyst.expressions.ScalaUDF.udfErrorMessage(ScalaUDF.scala:1047)
  at org.apache.spark.sql.catalyst.expressions.ScalaUDF.doGenCode(ScalaUDF.scala:1000)
  at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107)
  at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
  at org.apache.spark.sql.catalyst.expressions.Alias.genCode(namedExpressions.scala:142)
  at org.apache.spark.sql.execution.ProjectExec$$anonfun$6.apply(basicPhysicalOperators.scala:60)
  at org.apache.spark.sql.execution.ProjectExec$$anonfun$6.apply(basicPhysicalOperators.scala:60)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.immutable.List.map(List.scala:285)
  at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:60)
  at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:181)
  at org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:354)
  at org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:383)
  at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
  at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
  at org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:354)
  at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:45)
  at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
  at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
  at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:35)
  at org.apache.spark.sql.execution.BaseLimitExec$class.doProduce(limit.scala:70)
  at org.apache.spark.sql.execution.LocalLimitExec.doProduce(limit.scala:97)
  at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
  at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
  at org.apache.spark.sql.execution.LocalLimitExec.produce(limit.scala:97)
  at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:524)
  at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:576)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:337)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3273)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
  at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2698)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:723)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:682)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:691)
  ... 118 elided
Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: -10
  at java.lang.String.substring(String.java:1931)
  at java.lang.Class.getSimpleBinaryName(Class.java:1448)
  ... 185 more

有什么想法吗?

谢谢!

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    来自Spark2.2+

    您不必在这里使用udf,而是使用内置的typedLit函数来创建查找。

    Example:

    import org.apache.spark.sql.functions._
    
    val df=Seq(("1","Rodrigo","30","2019-01-01","male"),("1","Rodrigo","30","2019-01-01","female"),("1","Rodrigo","30","2019-01-01","mal")).toDF("id","name","age","date","genderKey")
    
    val genderMap=typedLit(Map("male" -> "Masculine","female" -> "Feminie"))
    //genderMap: org.apache.spark.sql.Column = keys: [male,female], values: [Masculine,Feminie]
    
    df.withColumn("genderName",coalesce(genderMap(col("genderKey")),lit("not found"))).show()
    //+---+-------+---+----------+---------+----------+
    //| id|   name|age|      date|genderKey|genderName|
    //+---+-------+---+----------+---------+----------+
    //|  1|Rodrigo| 30|2019-01-01|     male| Masculine|
    //|  1|Rodrigo| 30|2019-01-01|   female|   Feminie|
    //|  1|Rodrigo| 30|2019-01-01|      mal| not found|
    //+---+-------+---+----------+---------+----------+
    

    使用 UDF

    var genderMap = Map[String, String](
        "male" -> "Masculine",
        "female" -> "Feminine"
    )
    
    def getGenderName(genderMap:Map[String,String]) = udf((gender:String) => genderMap.getOrElse(gender,"not found"))
    
    df.withColumn("genderName",getGenderName(genderMap)(col("genderKey"))).show()
    //+---+-------+---+----------+---------+----------+
    //| id|   name|age|      date|genderKey|genderName|
    //+---+-------+---+----------+---------+----------+
    //|  1|Rodrigo| 30|2019-01-01|     male| Masculine|
    //|  1|Rodrigo| 30|2019-01-01|   female|  Feminine|
    //|  1|Rodrigo| 30|2019-01-01|      mal| not found|
    //+---+-------+---+----------+---------+----------+
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-09-09
      • 2018-05-16
      • 1970-01-01
      • 2019-09-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多