【问题标题】:Issue with inferring the datatype of complex struct field spark推断复杂结构字段 spark 的数据类型的问题
【发布时间】:2021-02-22 16:56:43
【问题描述】:

我有一个如下所示的 spark 数据框。它在 zipped_feature 列中有数组结构数组。

+--------------------+
|zipped_feature      |
+--------------------+
|[[A, 1], [ABC, 33]] |
|[[A, 1], [ABS, 24]] |
|[[B, 2], [ABE, 17]] |
|[[C, 3], [ABC, 33]] |
+--------------------+

我尝试使用 index 在这个数组结构数组上获取一个项目(这也是一个数组)。我尝试在 udf 下面根据索引获取值。如果第一行的索引为 0,那么我应该检索“[A, 1]”作为数组。

val getValueUdf = udf { (zippedFeature: Seq[Seq[String]], index: Int) => zippedFeature(index) }

但是我遇到了错误

data type mismatch: argument 1 requires array<array<string>> type, however, '`zipped_feature`' is of array<struct<_1:string,_2:string>> type.

当我打印出如下图所示的架构时

|-- zipped_feature: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _1: string (nullable = true)
 |    |    |-- _2: string (nullable = true)

有人可以帮助确定我在这里做错了什么。我想根据索引获取值(又是一个数组)。

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    zipped_feature 是数组 类型的列。如果要将每个嵌套列的值作为数组获取,则需要修改UDF,如下所示。

    val spark = SparkSession.builder().master("local[*]").getOrCreate()
    spark.sparkContext.setLogLevel("OFF")
    import spark.implicits._
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.types._
    
    // constructing sample dataframe
    val rows=
        List(Row(Array(Row("A","1"),Row("ABC","33"))),
        Row(Array(Row("A","1"),Row("ABS","24"))),
        Row(Array(Row("B","2"),Row("ABE","17"))),
        Row(Array(Row("C","3"),Row("ABC","33"))))
    val rdd=spark.sparkContext.parallelize(rows)
    
    val schema=new StructType().add("zipped_feature",ArrayType(new StructType().add("_1",StringType).add("_2",StringType)))
    val df=spark.createDataFrame(rdd,schema)
    df.show()
    /*
    +-------------------+
    |     zipped_feature|
    +-------------------+
    |[[A, 1], [ABC, 33]]|
    |[[A, 1], [ABS, 24]]|
    |[[B, 2], [ABE, 17]]|
    |[[C, 3], [ABC, 33]]|
    +-------------------+
    */
    df.printSchema()
    /*
    root
    |-- zipped_feature: array (nullable = true)
    |    |-- element: struct (containsNull = true)
    |    |    |-- _1: string (nullable = true)
    |    |    |-- _2: string (nullable = true)
    */
    
    // udf
     val getValueUdf = udf { (zippedFeature: Seq[Row],index:Int) =>zippedFeature(index).toSeq.map(_.toString)}
    
     df.withColumn("first_column",getValueUdf('zipped_feature,lit(0)))
      .withColumn("second_column",getValueUdf('zipped_feature,lit(1)))
      .show(false)
     
     /* output
     +-------------------+------------+-------------+
     |zipped_feature     |first_column|second_column|
     +-------------------+------------+-------------+
     |[[A, 1], [ABC, 33]]|[A, 1]      |[ABC, 33]    |
     |[[A, 1], [ABS, 24]]|[A, 1]      |[ABS, 24]    |
     |[[B, 2], [ABE, 17]]|[B, 2]      |[ABE, 17]    |
     |[[C, 3], [ABC, 33]]|[C, 3]      |[ABC, 33]    |
     +-------------------+------------+-------------+
    
    
    */
    

    【讨论】:

      【解决方案2】:

      据我所知,您不需要为此用例使用用户定义的函数。您可以轻松地使用withColumnselect 语句来完成工作。

      //Source data
      import org.apache.spark.sql.functions._
      import org.apache.spark.sql.types._
      import spark.implicits._
      val df = Seq((Seq(Array("A","1"),Array("ABC","33"))),(Seq(Array("A","1"),Array("ABS","24")))).toDF("zipped_feature")
      // 1) getting the value using select statements
      val df1 = df.select($"zipped_feature"(0).as("ArrayZero"),$"zipped_feature"(1).as("ArrayOne"))
      // 2) getting the values using withColumn
      val df2 = df.withColumn("Array_Zero",$"zipped_feature"(0)).withColumn("Array_One",$"zipped_feature"(1))
      // 3) Getting the value of the Inner array
      val df3 = df1.select($"ArrayZero"(0).as("InnerArrayZero"))
      // 4) Getting the value of the first element
      val value = df1.select($"ArrayZero"(0)).first.getString(0)
      

      输出 1:

      输出 2:

      输出 3:

      输出 4:

      【讨论】:

      • 感谢您的意见。我想使用 udf 作为“zipped_feature”列可以有任意数量的数组。我需要为每个数组动态添加列。
      【解决方案3】:

      从错误消息中,zipped_feature 列的类型是结构数组而不是数组数组。您不需要 UDF 通过索引访问数组元素,您可以使用以下选项之一:

      col("zipped_feature")(idx) // opt1
      col("zipped_feature").getItem(idx)  // opt2
      element_at(col("zipped_feature"), idx) // opt3
      

      要将结构数组转换为数组数组,您可以使用transform 函数:

      val df1 = df.withColumn(
          "zipped_feature",
          expr("transform(zipped_feature, x -> array(x._1, x._2))")
        ).select(
          col("zipped_feature")(0).as("idx0"),
          col("zipped_feature")(1).as("idx1")
        )
      
      df1.show
      //+------+---------+
      //|  idx0|     idx1|
      //+------+---------+
      //|[A, 1]|[ABC, 33]|
      //|[A, 1]|[ABS, 24]|
      //|[B, 2]|[ABE, 17]|
      //|[C, 3]|[ABC, 33]|
      //+------+---------+
      
      df1.printSchema
      //root
      // |-- idx0: array (nullable = true)
      // |    |-- element: string (containsNull = true)
      // |-- idx1: array (nullable = true)
      // |    |-- element: string (containsNull = true)
      

      或者直接不转换数组:

      val df1 = df.select(
        expr("array(zipped_feature[0]._1, zipped_feature[0]._2)").as("idx0"),
        expr("array(zipped_feature[1]._1, zipped_feature[1]._2)").as("idx1")
      )
      

      【讨论】:

        【解决方案4】:

        您可以尝试使用数据集 API map 方法:

        def getValue(zippedFeature: Seq[(String, String)], index: Int): Seq[String] = {
            zippedFeature(index).productIterator.toList.toSeq.map(_.toString)
        }
        
        df.as[Seq[(String, String)]].map(x => (x, getValue(x, 0))).show
        +-------------------+------+
        |                 _1|    _2|
        +-------------------+------+
        |[[A, 1], [ABC, 33]]|[A, 1]|
        |[[A, 1], [ABS, 24]]|[A, 1]|
        |[[B, 2], [ABE, 17]]|[B, 2]|
        |[[C, 3], [ABC, 33]]|[C, 3]|
        +-------------------+------+
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 2011-06-25
          • 1970-01-01
          • 1970-01-01
          • 2015-12-14
          • 1970-01-01
          • 2021-06-23
          • 1970-01-01
          相关资源
          最近更新 更多