【问题标题】:Filter array of struct in spark dataframeSpark数据框中的结构过滤器数组
【发布时间】:2018-08-01 06:17:05
【问题描述】:

我有一个 JSON 文件,我正在使用 Scala 2.10 将其读入 Spark 数据帧

val df = sqlContext.read.json("file_path")

JSON 如下所示:

{ "data": [{ "id":"20180218","parent": [{"name": "Market"}]}, { "id":"20180219","parent": [{"name": "Client"},{"name": "Market" }]}, { "id":"20180220","parent": [{"name": "Client"}]},{ "id":"20180221","parent": []}]}

data 是一个结构数组。每个结构再次具有父键。 Parent 又是一个结构数组,可以保存 0 个或多个值。

我需要过滤父数组,使其仅包含名称为“Market”或没有名称的结构。我的输出应该是这样的:

{ "data": [{ "id":"20180218","parent": [{"name": "Market"}]}, { "id":"20180219","parent": [{"name": "Market" }]}, { "id":"20180220","parent": []},{ "id":"20180221","parent": []}]}

因此,基本上过滤掉所有名称不是“Market”的结构,并保留空的父数组(作为操作的结果,或者如果它已经为空)。

有人可以帮忙吗?

谢谢

【问题讨论】:

  • 到目前为止你尝试过什么?你能分享一些你尝试过的代码示例吗?

标签: scala spark-dataframe


【解决方案1】:

我们需要使用explode函数来实现这种嵌套的JSON结构和数组查询。

scala> val df = spark.read.json("/Users/pavithranrao/Desktop/test.json")

scala> df.printSchema
root
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- parent: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- name: string (nullable = true)

scala> val oneDF = df.select(col("data"), explode(col("data"))).toDF("data", "element").select(col("data"), col("element.parent"))
scala> oneDF.show
"""
+--------------------+--------------------+
|                data|              parent|
+--------------------+--------------------+
|[[20180218,Wrappe...|          [[Market]]|
|[[20180218,Wrappe...|[[Client], [Market]]|
|[[20180218,Wrappe...|          [[Client]]|
|[[20180218,Wrappe...|                  []|
+--------------------+--------------------+
"""

scala> val twoDF = oneDF.select(col("data"), explode(col("parent"))).toDF("data", "names")
scala> twoDF.printSchema
root
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- parent: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- name: string (nullable = true)
 |-- names: struct (nullable = true)
 |    |-- name: string (nullable = true)

scala> twoDF.show
"""
+--------------------+--------+
|                data|   names|
+--------------------+--------+
|[[20180218,Wrappe...|[Market]|
|[[20180218,Wrappe...|[Client]|
|[[20180218,Wrappe...|[Market]|
|[[20180218,Wrappe...|[Client]|
+--------------------+--------+
"""

scala> import org.apache.spark.sql.functions.length

// Extract names struct that is Empty
scala> twoDF.select(length(col("names.name")) === 0).show
+------------------------+
|(length(names.name) = 0)|
+------------------------+
|                   false|
|                   false|
|                   false|
|                   false|
+------------------------+

// Extract names strcut that doesn't have Market
scala> twoDF.select(!col("names.name").contains("Market")).show()
+----------------------------------+
|(NOT contains(names.name, Market))|
+----------------------------------+
|                             false|
|                              true|
|                             false|
|                              true|
+----------------------------------+

// Combining these two

scala> val ansDF = twoDF.select("data").filter(!col("names.name").contains("Market") || length(col("names.name")) === 0)
scala> ansDF.printSchema

// Schema same as input df
root
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- parent: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- name: string (nullable = true)

scala> ansDF.show(false)
+----------------------------------------------------------------------------------------------------------------------------------------------+
|data                                                                                                                                          |
+----------------------------------------------------------------------------------------------------------------------------------------------+
|[[20180218,WrappedArray([Market])], [20180219,WrappedArray([Client], [Market])], [20180220,WrappedArray([Client])], [20180221,WrappedArray()]]|
|[[20180218,WrappedArray([Market])], [20180219,WrappedArray([Client], [Market])], [20180220,WrappedArray([Client])], [20180221,WrappedArray()]]|
+----------------------------------------------------------------------------------------------------------------------------------------------+

最终的ansDF有满足条件name不包含'Market'或isEmpty的过滤记录。

PS:如果我错过了确切的过滤器场景,请从 上面代码中的过滤函数

希望这会有所帮助!

【讨论】:

  • 附带说明,如果您使用Spark v 2.0+,然后使用DataSet 代替DataFrames,并为嵌套结构提供适当的案例类,这可以很容易地实现。 DataSet 允许我们使用像filter 这样的RDD 操作,我们不需要使用explode 来达到结构或数组的峰值。
  • 嗨帕维特兰。这不能解决目的。由于不正确的取消引用,过滤条件不起作用。我试过twoDF.where(length(col("names.name")) === 0 || !col("names.name").contains("Market")).show,结果是+--------------------+--------+ | data| names| +--------------------+--------+ |[[20180218,Wrappe...| [Client]| |[[20180218,Wrappe...| [Client]| +--------------------+--------+
  • twoDF.where(length(col("names.name")) === 0 || !col("names.name").contains("Market")) 给出了正确的结果吗?如果需要,请编辑答案。
  • 不,它没有。它只返回具有父名称的行作为客户端并过滤掉空值/空父。
  • 这可能来得有点晚,但使用过滤器应该是答案。您只需要如何从列中提取包含市场的“名称”。这是你应该做的。 ``` df.withColumn("filtered_by_market", (c: Column) => c.apply("name") === "Market") ``` 就这么简单..
【解决方案2】:

假设您有一个包含结构数组的列,在您的情况下为parent,您需要做的是使用函数filter。我相信有些人已经说过了。诀窍是过滤功能需要在结构上工作。

根据文档https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Column.html#apply(extraction:Any):org.apache.spark.sql.Column你可以

从复杂类型中提取一个或多个值。支持以下类型的提取:

  • 给定一个数组,整数序数可用于检索单个值。
  • 给定一个 Map,可以使用正确类型的键来检索单个值。
  • 给定一个 Struct,可以使用字符串 fieldName 来提取该字段。
  • 给定一个结构数组,字符串 fieldName 可用于提取该数组中每个结构的字段,并返回一个字段数组。

因此过滤很简单:

df.withColumn("filtered", filter(col("parent"), (c: Column) => c.apply("name") === "Market")

我相信这是最有效和最干净的方式。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-05-16
    • 2021-11-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-07-24
    • 1970-01-01
    相关资源
    最近更新 更多