【问题标题】:update a dataframe struct column value in spark scala更新 spark scala 中的数据框结构列值
【发布时间】:2021-11-23 23:07:31
【问题描述】:

我有以下数据框,其架构如下:

root
 |-- col1: string (nullable = true)
 |-- col2: integer (nullable = true)
 |-- colStruct: struct (nullable = true)
 |    |-- subCol1: integer (nullable = true)
 |    |-- subCol2: string (nullable = true)
      |-- subCol3: integer (nullable = true)

如何使用 UDF 更新 subCol1subCol3 列值?

【问题讨论】:

  • 您是要使用单个 UDF 更新两列,还是每列有 1 个 UDF?
  • 每列有一个 udf

标签: scala apache-spark pyspark


【解决方案1】:

使用.(dot) 表示法访问嵌套列。

这是一个例子:

数据

case class Details(height: Integer, weight: Integer, sex: String) // height in cms, weight in lbs
case class Person(name: String, age: Integer, details: Details)

println("The following is our dataset")
val data = Seq(
  Person("Darth Vader", 80, Details(180, 200, "male")),
  Person("Luke Skywalker", 25, Details(185, 180, "male")),
  Person("Obi-Wan Kenobe", 50, Details(175, 175, "male")),
  Person("Princess Leia", 23, Details(165, 150, "female")),
).toDF.cache()
data.show(5, false)

println("The schema of our data is:")
data.printSchema()


/*
The following is our dataset
+--------------+---+------------------+
|name          |age|details           |
+--------------+---+------------------+
|Darth Vader   |80 |{180, 200, male}  |
|Luke Skywalker|25 |{185, 180, male}  |
|Obi-Wan Kenobe|50 |{175, 175, male}  |
|Princess Leia |23 |{165, 150, female}|
+--------------+---+------------------+

The schema of our data is:
root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- details: struct (nullable = true)
 |    |-- height: integer (nullable = true)
 |    |-- weight: integer (nullable = true)
 |    |-- sex: string (nullable = true)
*/

更新嵌套列

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql._

// list out the columns you want to update using .(dot) notation
val allNestedColumnNamesToUpdate = Seq("details.height", "details.weight")
// list out all nested columns
val allNestedColumnNames = Seq("height", "weight", "sex")

// create your UDFs. Here we have created one for each integer nested column
val updateHeight = (value: Int) => { if (value < 180) 190 else 170 }
val updateWeight = (value: Int) => { if (value < 180) 190 else 170 }
// register UDFs
val updateHeightUDF = spark.udf.register("updateHeightUDF", updateHeight)
val updateWeightUDF = spark.udf.register("updateWeightUDF", updateWeight)

// Map the name of the nested column to update to it's UDF
val columnNameToUpdateToUDFMap = Map (
  "details.height" -> updateHeightUDF,
  "details.weight" -> updateWeightUDF
)

val updatedDF = allNestedColumnNamesToUpdate.foldLeft(data)((acc, columnNameToUpdate) => {
  val udf = columnNameToUDFMap(columnNameToUpdate)
  val updatedStructColumns = allNestedColumnNames.map(x => {
    if(x == columnNameToUpdate) lit(udf(col(columnNameToUpdate))).as(columnNameToUpdate)
    else col(s"details.$x")
  })
  df.withColumn("details", struct(updatedStructColumns: _*))
})

updatedDF.show()
/*
+--------------+---+------------------+
|          name|age|           details|
+--------------+---+------------------+
|   Darth Vader| 80|  {170, 170, male}|
|Luke Skywalker| 25|  {170, 170, male}|
|Obi-Wan Kenobe| 50|  {190, 190, male}|
| Princess Leia| 23|{190, 190, female}|
+--------------+---+------------------+
*/

注意:不建议使用 UDF,因为它们对 Spark 的优化器不可见。

【讨论】:

  • 我不确定你是否理解我的问题..我想使用 UDF 更新嵌套列的值
  • vkt,我已经更新了答案。让我知道这是否能更好地回答您的问题。
  • 我的列列表很长,因此无法将它们列为 val data2 = data.withColumn("details", struct( lit( updateWeightUDF($"name", $"details .sex") ).as("weight"), $"details.height", $"details.sex" )).
  • @vkt 我已经更新为以编程方式执行此操作。希望对您有所帮助。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-07-29
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多