【问题标题】:Spark large data frame add new columns based on other column valuesSpark大数据框根据其他列值添加新列
【发布时间】:2021-04-01 12:42:06
【问题描述】:

我是新来的,正在处理大约 20GB 大小的大型数据集(多个小文件),并且需要帮助将这些数据转换为以下格式:

我有这种格式的数据:

+----------+-------------------------+-------------------+---------+------+
|   id     |       values            |     creation date | leadTime| span |
+----------+-------------------------+-------------------+---------+--+---+
|id_1      |[[v1, 0.368], [v2, 0.5]] |     2020-07-15    |      16 |  15  |
|id_2      |[[v1, 0.368], [v2, 0.4]] |     2020-07-15    |      16 |  15  |
|id_1      |[[v1, 0.468], [v2, 0.3]] |     2020-07-15    |      17 |  18  |
|id_2      |[[v1, 0.368], [v2, 0.3]] |     2020-07-15    |      17 |  18  | 
+----------+-------------------------+-------------------+---------+------+

我需要以下格式的数据,方法是使用列字段中的值:

使用 LeadTime 和 span 列值创建具有列名的新列

+----------+--------------+--------------------+--------------------+--------------------+--------------------+
|   id     |creation date | final_v1_16_15_wk  |  final_v2_16_15_wk |final_v1_17_18_wk  |  final_v2_17_18_wk  |
+----------+--------------+--------------------+--------------------+--------------------+--------------------+
|id_1      |2020-07-15    |       0.368        |         0.5        |       0.468        |         0.3        |
|id_2      |2020-07-15    |       0.368        |         0.4        |       0.368        |         0.3        |
+----------+--------------+--------------------+--------------------+--------------------+--------------------+

这是示例数据框:

val df = Seq(
  ("id_1", Map("v1" -> 0.368, "v2" -> 0.5, "v3" -> 0.6), "2020-07-15", 16, 15),
  ("id_1", Map("v1" -> 0.564, "v2" -> 0.78, "v3" -> 0.65), "2020-07-15", 17, 18),
  ("id_2", Map("v1" -> 0.468, "v2" -> 0.3, "v3" -> 0.66), "2020-07-15", 16, 15),
  ("id_2", Map("v1" -> 0.657, "v2" -> 0.65, "v3" -> 0.67), "2020-07-15", 17, 18)).toDF("id", "values", "creation date", "leadTime", "span")

尝试使用以下逻辑生成列名/值,但没有成功:

val modDF = finalDF.withColumn("final_" + newFinalDF("values").getItem(0).getItem("_1") + "_" + newFinalDF("leadTime") + "_" + newFinalDF("span") + "_wk", $"values".getItem(0).getItem("_2"));

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    Pivot 可用于此。

    import org.apache.spark.sql.functions._
    val explodeDf=df.select(col("id"),col("creation date"),explode_outer(col("values")),col("leadTime"),col("span"))
    val finalDf=explodeDf.select(col("id"),col("creation date"),col("value"),concat(lit("final_"),col("key"),lit("_"),col("leadTime"),lit("_"),col("span"),lit("_wk")).as("colDerived"))
    finalDf.groupBy(col("id"),col("creation date")).pivot(col("colDerived")).agg(sum(col("value"))).show()
    
       +----+-------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
    |  id|creation date|final_v1_16_15_wk|final_v1_17_18_wk|final_v2_16_15_wk|final_v2_17_18_wk|final_v3_16_15_wk|final_v3_17_18_wk|
    +----+-------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
    |id_1|   2020-07-15|            0.368|            0.564|              0.5|             0.78|              0.6|             0.65|
    |id_2|   2020-07-15|            0.468|            0.657|              0.3|             0.65|             0.66|             0.67|
    +----+-------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
    

    How to pivot Spark DataFrame?

    【讨论】:

    • 谢谢,我不知道枢轴功能。
    【解决方案2】:

    您可以在进行必要的值更改以适合列标题后使用数据透视。 如果您预先提供列列表,Pivot 将具有更好的性能,否则 Spark 将在列上运行 distinct。

    import org.apache.spark.sql.functions._
    val df = Seq(
      ("id_1", Map("v1" -> 0.368, "v2" -> 0.5, "v3" -> 0.6), "2020-07-15", 16, 15),
      ("id_1", Map("v1" -> 0.564, "v2" -> 0.78, "v3" -> 0.65), "2020-07-15", 17, 18),
      ("id_2", Map("v1" -> 0.468, "v2" -> 0.3, "v3" -> 0.66), "2020-07-15", 16, 15),
      ("id_2", Map("v1" -> 0.657, "v2" -> 0.65, "v3" -> 0.67), "2020-07-15", 17, 18)).toDF("id", "values", "creation date", "leadTime", "span")
    
    val df2 = df.select($"id",explode_outer($"values"),$"creation date", $"leadTime", $"span")
    .withColumn("keys", concat(lit("final_"), col("key")))
    .withColumn("leadTimes", concat(lit("_"), col("leadTime"),lit("_")))
    .withColumn("spans", concat(col("span"),lit("_wk")))
    .drop("leadTime","key","span")
    .withColumnRenamed("keys","key").withColumnRenamed("leadTimes","leadTime").withColumnRenamed("spans","span")
    
    val df3 = df2.groupBy($"id",$"creation date").pivot(concat($"key",$"leadTime",$"span")).agg(first("value"))
    df3.show()
    

    输出

    +----+-------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
    |  id|creation date|final_v1_16_15_wk|final_v1_17_18_wk|final_v2_16_15_wk|final_v2_17_18_wk|final_v3_16_15_wk|final_v3_17_18_wk|
    +----+-------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
    |id_1|   2020-07-15|            0.368|            0.564|              0.5|             0.78|              0.6|             0.65|
    |id_2|   2020-07-15|            0.468|            0.657|              0.3|             0.65|             0.66|             0.67|
    +----+-------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
    

    https://databricks.com/blog/2016/02/09/reshaping-data-with-pivot-in-apache-spark.html

    【讨论】:

    • 谢谢,我不知道枢轴功能。
    猜你喜欢
    • 2019-04-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-01-14
    • 2022-12-17
    • 1970-01-01
    相关资源
    最近更新 更多