【问题标题】:Pyspark create DataFrame from rows/data with varying columnsPyspark 从具有不同列的行/数据创建 DataFrame
【发布时间】:2018-11-29 14:35:15
【问题描述】:

我有多个键/值对的数据/行,键的数量未知——有些重叠,有些没有——我想从中创建一个 Spark DataFrame。我的最终目标是从此 DataFrame 编写 CSV。

我对输入数据/行具有灵活性:它们最容易是 JSON 字符串,但可以通过 可能重叠的键进行转换:

{"color":"red", "animal":"fish"}
{"color":"green", "animal":"panda"}
{"color":"red", "animal":"panda", "fruit":"watermelon"}
{"animal":"aardvark"}
{"color":"blue", "fruit":"apple"}

理想情况下,我想根据这些数据创建一个如下所示的 DataFrame:

-----------------------------
color | animal   | fruit
-----------------------------
red   | fish     | null
green | panda    | null
red   | panda    | watermelon
null  | aardvark | null
blue  | null     | apple
-----------------------------

值得注意的是,没有特定键的数据/行是null,数据/行中的所有键都表示为列。

我对 Spark 的许多基础知识感到相对自在,但在设想一个有效地使用键/值对获取我的 RDD/DataFrame 的过程时遇到了困难 -- 但列和键的数量未知 -- 并创建一个以这些键为列的 DataFrame。

高效,因为如果可能,我想避免创建一个所有输入行都保存在内存中的对象(例如单个字典)。

再一次,编写 CSV 的最终目标是,我假设创建一个 DataFrame 是实现这一目标的合乎逻辑的步骤。

另一个皱纹:

一些数据将是多值的,例如:

{"color":"pink", "animal":["fish","mustang"]}
{"color":["orange","purple"], "animal":"panda"}

使用提供的分隔符,例如/ 为避免与 , 冲突以分隔列,我想在列的输出中分隔这些,例如:

------------------------------------
color         | animal       | fruit
------------------------------------
pink          | fish/mustang | null
orange/purple | panda        | null
------------------------------------

一旦有了解决主要问题的方法,我相信我可以解决这部分问题,但无论如何都会把它扔掉,因为这将是问题的一个维度。

【问题讨论】:

  • 你试过df = spark.read.json("myfile.json")。在你的第一个例子中似乎对我有用。 更新:它也适用于您的第二个示例,但将所有记录视为字符串,因此您必须执行一些 regex to convert the string representation of the list 以按照您想要的方式对其进行格式化。
  • 感谢@pault 的想法。我正要说它可能行不通,因为我的数据实际上来自 DataFrame,我正在将单列 XML 转换为 JSON 字符串。但这很有趣,我可以用 JSON 行创建一个 RDD,编写它,然后读取它?还是有另一种方法可以从 RDD 模拟 .json() 方法,而不是从读取外部位置?
  • 意识到read.json() 也可能接受 RDD,spark.apache.org/docs/latest/api/python/…,试一试...

标签: python json apache-spark pyspark apache-spark-sql


【解决方案1】:

从文件中读取

如果您的数据存储在一个文件中(假设它被命名为myfile.json),如下所示:

{"color":"red", "animal":"fish"}
{"color":"green", "animal":"panda"}
{"color":"red", "animal":"panda", "fruit":"watermelon"}
{"animal":"aardvark"}
{"color":"blue", "fruit":"apple"}
{"color":"pink", "animal":["fish","mustang"]}
{"color":["orange","purple"], "animal":"panda"}

您可以使用pyspark.sql.DataFrameReader.json 将文件读取为换行符分隔的 JSON 记录。

df = spark.read.json("myfile.json")
df.show()
#+------------------+-------------------+----------+
#|            animal|              color|     fruit|
#+------------------+-------------------+----------+
#|              fish|                red|      null|
#|             panda|              green|      null|
#|             panda|                red|watermelon|
#|          aardvark|               null|      null|
#|              null|               blue|     apple|
#|["fish","mustang"]|               pink|      null|
#|             panda|["orange","purple"]|      null|
#+------------------+-------------------+----------+

df.printSchema()
#root
# |-- animal: string (nullable = true)
# |-- color: string (nullable = true)
# |-- fruit: string (nullable = true)

从 RDD 读取

您也可以这样做来读取rdd

import json

rdd = sc.parallelize(
    map(
        json.dumps,
        [
            {"color":"red", "animal":"fish"},
            {"color":"green", "animal":"panda"},
            {"color":"red", "animal":"panda", "fruit":"watermelon"},
            {"animal":"aardvark"},
            {"color":"blue", "fruit":"apple"},
            {"color":"pink", "animal":["fish","mustang"]},
            {"color":["orange","purple"], "animal":"panda"}
        ]
    )
)

df = spark.read.json(rdd)

对于第二部分,您可以根据需要使用pyspark.sql.functions.regexp_replace 来格式化您的多值记录。

from pyspark.sql.functions import regexp_replace

def format_column(column):
    return regexp_replace(regexp_replace(column, '(^\[)|(\]$)|(")', ''), ",", "/") 

df.select(*[format_column(c).alias(c) for c in df.columns]).show()

#+------------+-------------+----------+
#|      animal|        color|     fruit|
#+------------+-------------+----------+
#|        fish|          red|      null|
#|       panda|        green|      null|
#|       panda|          red|watermelon|
#|    aardvark|         null|      null|
#|        null|         blue|     apple|
#|fish/mustang|         pink|      null|
#|       panda|orange/purple|      null|
#+------------+-------------+----------+

【讨论】:

  • 直到现在我才意识到使用spark.read.json 读取 RDD 是可能的,这打破了其余部分。非常感谢!用于分解多值的正则表达式语法相同(这就是我在非 Spark 上下文中这样做的方式)。
猜你喜欢
  • 1970-01-01
  • 2020-01-14
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-07-01
  • 2020-05-04
相关资源
最近更新 更多