【问题标题】:Change value of nested column in DataFrame更改 DataFrame 中嵌套列的值
【发布时间】:2019-12-11 21:04:55
【问题描述】:

我有两级嵌套字段的数据框

 root
 |-- request: struct (nullable = true)
 |    |-- dummyID: string (nullable = true)
 |    |-- data: struct (nullable = true)
 |    |    |-- fooID: string (nullable = true)
 |    |    |-- barID: string (nullable = true)

我想在此处更新fooId 列的值。我能够更新第一级的值,例如 dummyID 列在这里使用这个问题作为参考 How to add a nested column to a DataFrame

输入数据:

{
    "request": {
        "dummyID": "test_id",
        "data": {
            "fooID": "abc",
            "barID": "1485351"
        }
    }
}

输出数据:

{
    "request": {
        "dummyID": "test_id",
        "data": {
            "fooID": "def",
            "barID": "1485351"
        }
    }
}

我怎样才能使用 Scala 来做到这一点?

【问题讨论】:

  • 这只有一个json字符串吗?或者你有很多这样的数据。我猜火花对于小数据不会有成果。请更新您如何将 json 转换为数据框
  • 是的,这里只有一个json字符串。不幸的是,我这里必须使用spark。数据输出也需要在 json 中。上面的dataframe转换只是printSchema()testDf = sparkSession.read.json("fileLoc") testDf.printSchema()
  • 我不会回答这个问题,因为它对 spark 的使用效率很低,但你可以这样做df.withColumn("request", struct(struct(lit("def").as("fooID"), col("request.data.barID").as("barID")).as("data"), col("request.dummyID").as("dummyID"))) 祝你好运:)

标签: scala apache-spark dataframe


【解决方案1】:

以下是此问题的通用解决方案,它可以根据递归遍历中应用的任意函数在任何级别更新任意数量的嵌套值:

def mutate(df: DataFrame, fn: Column => Column): DataFrame = {
  // Get a projection with fields mutated by `fn` and select it
  // out of the original frame with the schema reassigned to the original
  // frame (explained later)
  df.sqlContext.createDataFrame(df.select(traverse(df.schema, fn):_*).rdd, df.schema)
}

def traverse(schema: StructType, fn: Column => Column, path: String = ""): Array[Column] = {
  schema.fields.map(f => {
    f.dataType match {
      case s: StructType => struct(traverse(s, fn, path + f.name + "."): _*)
      case _ => fn(col(path + f.name))
    }
  })
}

这实际上等同于通常的“将整个结构重新定义为投影”解决方案,但它使用原始结构自动重新嵌套字段并保留可空性/元数据(当您手动重新定义结构时会丢失)。令人讨厌的是,在创建投影(afact)时无法保留这些属性,因此上面的代码手动重新定义了架构。

一个示例应用程序:

case class Organ(name: String, count: Int)
case class Disease(id: Int, name: String, organ: Organ)
case class Drug(id: Int, name: String, alt: Array[String])

val df = Seq(
  (1, Drug(1, "drug1", Array("x", "y")), Disease(1, "disease1", Organ("heart", 2))),
  (2, Drug(2, "drug2", Array("a")), Disease(2, "disease2", Organ("eye", 3)))
).toDF("id", "drug", "disease")

df.show(false)

+---+------------------+-------------------------+
|id |drug              |disease                  |
+---+------------------+-------------------------+
|1  |[1, drug1, [x, y]]|[1, disease1, [heart, 2]]|
|2  |[2, drug2, [a]]   |[2, disease2, [eye, 3]]  |
+---+------------------+-------------------------+

// Update the integer field ("count") at the lowest level:
val df2 = mutate(df, c => if (c.toString == "disease.organ.count") c - 1 else c)
df2.show(false)

+---+------------------+-------------------------+
|id |drug              |disease                  |
+---+------------------+-------------------------+
|1  |[1, drug1, [x, y]]|[1, disease1, [heart, 1]]|
|2  |[2, drug2, [a]]   |[2, disease2, [eye, 2]]  |
+---+------------------+-------------------------+

// This will NOT necessarily be equal unless the metadata and nullability
// of all fields is preserved (as the code above does)
assertResult(df.schema.toString)(df2.schema.toString)

这样做的一个限制是它不能添加新字段,只能更新现有字段(尽管地图可以更改为 flatMap 和返回 Array[Column] 的函数,如果您不关心保留可空性/元数据)。

此外,这里有一个更通用的 Dataset[T] 版本:

case class Record(id: Int, drug: Drug, disease: Disease)

def mutateDS[T](df: Dataset[T], fn: Column => Column)(implicit enc: Encoder[T]): Dataset[T] = {
  df.sqlContext.createDataFrame(df.select(traverse(df.schema, fn):_*).rdd, enc.schema).as[T]
}

// To call as typed dataset:
val fn: Column => Column = c => if (c.toString == "disease.organ.count") c - 1 else c
mutateDS(df.as[Record], fn).show(false)

// To call as untyped dataset:
implicit val encoder: ExpressionEncoder[Row] = RowEncoder(df.schema) // This is necessary regardless of sparkSession.implicits._ imports
mutateDS(df, fn).show(false)

【讨论】:

  • 这真的很有帮助,但是我在将 rdd 转换为 df 时遇到 Java 堆内存不足错误。你能帮我解决一下吗? df.sqlContext.createDataFrame(df.select(traverse(df.schema, fn):_*).rdd, df.schema)
  • 任何人都可以将第一个解决方案翻译成 Java
  • 如何更改字段的数据类型而不是值?
  • 如何在 python 中实现这一点?谢谢
【解决方案2】:

一种方法,虽然很麻烦,但通过显式引用原始结构的每个元素来完全解包并重新创建列。

dataFrame.withColumn("person", 
    struct(
        col("person.age").alias("age),
        struct(
            col("person.name.first").alias("first"),
            lit("some new value").alias("last")).alias("name")))

【讨论】:

    猜你喜欢
    • 2021-06-17
    • 2018-11-01
    • 2015-05-03
    • 2020-04-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-11-14
    • 1970-01-01
    相关资源
    最近更新 更多