【问题标题】:Element position from nested DataFrame array (Spark 2.2)嵌套 DataFrame 数组中的元素位置 (Spark 2.2)
【发布时间】:2019-01-11 03:37:51
【问题描述】:

我正在尝试在 Spark Scala 中分解嵌套的 DataFrame。我有一个 DataFrame df,其中包含以下信息:

root 
|-- id: integer (nullable = false) 
|-- features: array (nullable = true) 
|   |-- element: float (containsNull = false)

我已经将数组信息分解为一个平面 DataFrame:

df.selectExpr("id","explode(features) as features")

并得到以下DataFrame:

id  features    
0   0.0629885
0   0.15931357
0   0.08922347

我的最终目标是转换数据并计算与该信息的一些相似性。为此,将每个 ID 的特征的实际位置放入 DataFrame 中会非常酷,如下所示:

  id  features    feature_pos
  0   0.0629885   0
  0   0.15931357  1
  0   0.08922347  2

【问题讨论】:

    标签: scala apache-spark dataframe apache-spark-sql


    【解决方案1】:

    使用posexplode 代替explode

    • 为每个元素在给定数组或映射列中的位置创建一个新行。
    • 与posexplode不同,如果数组/映射为null或为空,则生成行(null,null)。

    【讨论】:

      【解决方案2】:

      这里是poseexplode的例子。

      scala> val df = Seq((0, Seq(0.1f, 0.2f, 0.3f)),(1, Seq(0.4f, 0.5f, 0.6f))).toDF("id", "features")
      df: org.apache.spark.sql.DataFrame = [id: int, features: array<float>]
      
      scala> df.show(false)
      +---+---------------+
      |id |features       |
      +---+---------------+
      |0  |[0.1, 0.2, 0.3]|
      |1  |[0.4, 0.5, 0.6]|
      +---+---------------+
      

      注意df.withColumn("pos cols",posexplode('features)).show(false)会抛出错误,所以使用df.select()

      scala> df.select(posexplode('features)).show(false)
      +---+---+
      |pos|col|
      +---+---+
      |0  |0.1|
      |1  |0.2|
      |2  |0.3|
      |0  |0.4|
      |1  |0.5|
      |2  |0.6|
      +---+---+
      
      
      scala>
      

      默认名称是“pos”和“col”。您可以将它们重命名为

      scala> df.select(posexplode('features).as(Seq("a","b"))).show(false)
      +---+---+
      |a  |b  |
      +---+---+
      |0  |0.1|
      |1  |0.2|
      |2  |0.3|
      |0  |0.4|
      |1  |0.5|
      |2  |0.6|
      +---+---+
      
      
      scala>
      

      当你想展开并选择所有列时,使用

      scala> df.select(col("*"), posexplode('features).as( Seq("a","b")) ).show(false)
      +---+---------------+---+---+
      |id |features       |a  |b  |
      +---+---------------+---+---+
      |0  |[0.1, 0.2, 0.3]|0  |0.1|
      |0  |[0.1, 0.2, 0.3]|1  |0.2|
      |0  |[0.1, 0.2, 0.3]|2  |0.3|
      |1  |[0.4, 0.5, 0.6]|0  |0.4|
      |1  |[0.4, 0.5, 0.6]|1  |0.5|
      |1  |[0.4, 0.5, 0.6]|2  |0.6|
      +---+---------------+---+---+
      
      
      scala>
      

      【讨论】:

        【解决方案3】:

        您还可以通过UDF 应用Scala 的zipWithIndex,如下所示:

        val df = Seq(
          (0, Seq(0.1f, 0.2f, 0.3f)),
          (1, Seq(0.4f, 0.5f, 0.6f))
        ).toDF("id", "features")
        
        def addIndex = udf(
          (s: Seq[Float]) => s.zipWithIndex
        )
        
        val df2 = df.withColumn( "features_idx", explode(addIndex($"features")) )
        
        df2.select( $"id", $"features_idx._1".as("features"), $"features_idx._2".as("features_pos") ).show
        +---+--------+------------+
        | id|features|features_pos|
        +---+--------+------------+
        |  0|     0.1|           0|
        |  0|     0.2|           1|
        |  0|     0.3|           2|
        |  1|     0.4|           0|
        |  1|     0.5|           1|
        |  1|     0.6|           2|
        +---+--------+------------+
        

        【讨论】:

          猜你喜欢
          • 2019-08-19
          • 2020-11-08
          • 1970-01-01
          • 1970-01-01
          • 2013-03-24
          • 2021-05-07
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多