【问题标题】:Define StructType as input datatype of a Function Spark-Scala 2.11 [duplicate]将 StructType 定义为 Function Spark-Scala 2.11 的输入数据类型 [重复]
【发布时间】:2020-03-17 08:20:21
【问题描述】:

我正在尝试在 scala 中编写 Spark UDF,我需要定义一个函数的输入数据类型

我有一个带有 StructType 的架构变量,如下所述。

import org.apache.spark.sql.types._

val relationsSchema = StructType(
      Seq(
        StructField("relation", ArrayType(
          StructType(Seq(
            StructField("attribute", StringType, true),
            StructField("email", StringType, true),
            StructField("fname", StringType, true),
            StructField("lname", StringType, true)
            )
          ), true
        ), true)
      )
    )

我正在尝试编写如下所示的函数

val relationsFunc: Array[Map[String,String]] => Array[String] = _.map(do something)
val relationUDF = udf(relationsFunc)

input.withColumn("relation",relationUDF(col("relation")))

上面的代码抛出下面的异常

org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(relation)' due to data type mismatch: argument 1 requires array<map<string,string>> type, however, '`relation`' is of array<struct<attribute:string,email:string,fname:string,lname:string>> type.;;
'Project [relation#89, UDF(relation#89) AS proc#273]

如果我将输入类型指定为

val relationsFunc: StructType =&gt; Array[String] =

我无法实现逻辑,因为 _.map 给了我元数据、文件名等。

请建议如何在以下函数中将关系模式定义为输入数据类型。

val relationsFunc: ? => Array[String] = _.map(somelogic)

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    您的关系结构是Row,因此您的函数应具有以下签名:

    val relationsFunc: Array[Row] => Array[String]
    

    然后您可以按位置或名称访问您的数据,即:

    {r:Row => r.getAs[String]("email")}
    

    【讨论】:

    • 这给了我一个 Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [Lorg.apache.spark.sql.Row;但是我使用了 Seq 而不是 Array,这很有效!谢谢
    【解决方案2】:

    查看文档中的映射表,确定 Spark SQL 和 Scala 之间的数据类型表示:https://spark.apache.org/docs/2.4.4/sql-reference.html#data-types

    您的relation 字段是StructType 类型的Spark SQL 复杂类型,它由Scala 类型org.apache.spark.sql.Row 表示,因此这是您应该使用的输入类型。

    我使用您的代码创建了这个提取 email 值的完整工作示例:

    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    
    val relationsSchema = StructType(
      Seq(
        StructField("relation", ArrayType(
          StructType(
            Seq(
              StructField("attribute", StringType, true),
              StructField("email", StringType, true),
              StructField("fname", StringType, true),
              StructField("lname", StringType, true)
            )
          ), true
        ), true)
      )
    )
    
    val data = Seq(
      Row("{'relation':[{'attribute':'1','email':'johnny@example.com','fname': 'Johnny','lname': 'Appleseed'}]}")
    )
    
    val df = spark.createDataFrame(
      spark.sparkContext.parallelize(data),
      relationsSchema
    )
    
    val relationsFunc = (relation: Array[Row]) => relation.map(_.getAs[String]("email"))
    val relationUdf = udf(relationsFunc)
    
    df.withColumn("relation", relationUdf(col("relation")))
    

    【讨论】:

      猜你喜欢
      • 2019-08-25
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多