【问题标题】:Spark SQL Column ManipulationSpark SQL 列操作
【发布时间】:2018-08-06 19:41:47
【问题描述】:

我有一个低于 Cols 的数据集。

df.show();

+--------+---------+---------+---------+---------+
|  Col1  |  Col2   | Expend1 | Expend2 | Expend3 |
+--------+---------+---------+---------+---------+
| Value1 | Cvalue1 |     123 |    2254 |      22 |
| Value1 | Cvalue2 |     124 |    2255 |      23 |
+--------+---------+---------+---------+---------+

我希望使用一些连接或多维数据集或任何操作将其更改为以下格式。

1.

    +--------+---------+------+
    | Value1 | Cvalue1 |  123 |
    | Value1 | Cvalue1 | 2254 |
    | Value1 | Cvalue1 |   22 |
    | Value1 | Cvalue1 |  124 |
    | Value1 | Cvalue1 | 2255 |
    | Value1 | Cvalue1 |   23 |
    +--------+---------+------+

如果这种格式更好

2.

+--------+---------+---------+------+
| Value1 | Cvalue1 | Expend1 |  123 |
| Value1 | Cvalue1 | Expend2 | 2254 |
| Value1 | Cvalue1 | Expend3 |   22 |
| Value1 | Cvalue1 | Expend1 |  124 |
| Value1 | Cvalue1 | Expend2 | 2255 |
| Value1 | Cvalue1 | Expend3 |   23 |
+--------+---------+---------+------+

我能不能实现以上两种可能的格式。如果在 #1 的情况下,我可以得到 Last value 的列名,无论是 Expend1 还是 Expend 2 或 Expend3。

【问题讨论】:

    标签: apache-spark dataframe apache-spark-sql apache-spark-dataset


    【解决方案1】:

    您可以将三列转换为arrayexplode

    import org.apache.spark.sql.functions._
    
    df.withColumn("Expand", explode(array("Expand1", "Expand2", "Expand3")))
      .drop("Expand1", "Expand2", "Expand3")
    

    要保留列值,您可以执行以下操作

      data.withColumn("Expand1", concat_ws(":", lit("Expand1"), $"Expand1"))
          .withColumn("Expand2", concat_ws(":", lit("Expand2") , $"Expand2"))
          .withColumn("Expand3", concat_ws(":", lit("Expand3") , $"Expand3"))
          .withColumn("Expand", explode(array("Expand1", "Expand2", "Expand3")))
          .drop("Expand1", "Expand2", "Expand3")
          .withColumn("ExpandColumn", split($"Expand", ":")(0))
          .withColumn("Expand", split($"Expand", ":")(1))
          .drop("Expand1", "Expand2", "Expand3")
        .show(false)
    

    希望对你有帮助

    【讨论】:

    • 这行得通,谢谢Shanakar,但是我可以知道特定行的展开列元素的名称,无论它是Expend1、Expend2 还是Expend3
    • 您也可以使用堆栈函数来获取列名或附加列名并稍后拆分
    【解决方案2】:

    您可以使用 Hive 函数 stack 来做到这一点:

    df.selectExpr("col1", 
                  "col2", 
                   "stack(3 , 'Expend1' , Expend1, 
                              'Expend2' , Expend2, 
                              'Expend3' , Expend3) 
                    as (Expend, Value) "
                  ).show(false)
    +------+-------+-------+-----+
    |col1  |col2   |Expend |Value|
    +------+-------+-------+-----+
    |Value1|Cvalue1|Expend1|123  |
    |Value1|Cvalue1|Expend2|2254 |
    |Value1|Cvalue1|Expend3|22   |
    |Value1|Cvalue2|Expend1|124  |
    |Value1|Cvalue2|Expend2|2255 |
    |Value1|Cvalue2|Expend3|23   |
    +------+-------+-------+-----+
    

    【讨论】:

      【解决方案3】:

      使用udf 函数,您可以实现第二个所需的数据帧

      val columns = df.select("Expend1","Expend2","Expend3").columns
      import org.apache.spark.sql.functions._
      def arrayStructUdf = udf((columnNames: collection.mutable.WrappedArray[String], columnValues: collection.mutable.WrappedArray[String]) => columnNames.zip(columnValues).map(x => (x._1, x._2)).toArray)
      

      然后只需调用 udf 函数,drop 三个额外的列,然后 explode 新形成的列,最后是 select 所需的列

      df.withColumn("new", arrayStructUdf(array(columns.map(x => lit(x)): _*), array(columns.map(col): _*)))
          .drop("Expend1","Expend2","Expend3")
          .withColumn("new", explode(col("new")))
          .select("Col1","Col2", "new.*")
      

      你应该有 第二个必需的数据框

      +------+-------+-------+----+
      |Col1  |Col2   |_1     |_2  |
      +------+-------+-------+----+
      |Value1|Cvalue1|Expend1|123 |
      |Value1|Cvalue1|Expend2|2254|
      |Value1|Cvalue1|Expend3|22  |
      |Value1|Cvalue2|Expend1|124 |
      |Value1|Cvalue2|Expend2|2255|
      |Value1|Cvalue2|Expend3|23  |
      +------+-------+-------+----+
      

      【讨论】:

        【解决方案4】:

        可以使用函数map,然后explode

        val data = List(
          ("Value1", "Cvalue1", 123, 2254, 22),
          ("Value1", "Cvalue2", 124, 2255, 23)
        )
        val df = data.toDF("Col1", "Col2", "Expend1", "Expend2", "Expend3")
        
        // action 
        val unpivotedColumns = List("Expend1", "Expend2", "Expend3")
        val columnMapping = unpivotedColumns.foldLeft(new ArrayBuffer[Column]())((acc, current) => {
          acc += lit(current)
          acc += col(current)
        })
        val mapped = df.select($"Col1", $"Col2", map(columnMapping: _*).alias("result"))
        val result = mapped.select($"Col1", $"Col2", explode($"result"))
        result.show(false)
        

        结果是:

        +------+-------+-------+-----+
        |Col1  |Col2   |key    |value|
        +------+-------+-------+-----+
        |Value1|Cvalue1|Expend1|123  |
        |Value1|Cvalue1|Expend2|2254 |
        |Value1|Cvalue1|Expend3|22   |
        |Value1|Cvalue2|Expend1|124  |
        |Value1|Cvalue2|Expend2|2255 |
        |Value1|Cvalue2|Expend3|23   |
        +------+-------+-------+-----+
        

        【讨论】:

          猜你喜欢
          • 2019-10-31
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2014-06-05
          • 1970-01-01
          • 2020-12-13
          • 1970-01-01
          相关资源
          最近更新 更多