【问题标题】:Split String in PySpark Dataframe在 PySpark 数据框中拆分字符串
【发布时间】:2018-12-03 04:54:12
【问题描述】:

我想在 PySpark 数据框中拆分一列,该列(字符串类型)如下所示:

[{"quantity":25,"type":"coins","balance":35}]
[{"balance":40,"type":"coins","quantity":25}]
[{"quantity":2,"type":"column_breaker","balance":2},{"quantity":2,"type":"row_breaker","balance":2},{"quantity":2,"type":"single_block_breaker","balance":2},{"quantity":1,"type":"rainbow","balance":1},{"quantity":135,"type":"coins","balance":140}]

所以他们中的一些人有一组"quantity, type, balance",而他们中的一些人有多个这样的条目。我尝试将其视为 JSON 变量并拆分:

schema = StructType(
[
    StructField('balance', StringType(), True),
    StructField('type', StringType(), True),
    StructField('quantity', StringType(), True)
 ]
 )

temp = merger.withColumn("data", 
from_json("items",schema)).select("items", col('data.*'))
display(temp)

但它只能将观察结果拆分为一组。我想要这样的输出

balance|quantity|type
   35  |   25   |coins
   40  |   25   |coins
.......

这样,一组观测值拆分为一个观测值,而多组观测值拆分为垂直放置的多个观测值。

另外,拆分成多行后,如何识别每个观察值?说,我有另一个变量是 ID,我该如何分配 ID?

【问题讨论】:

  • 你能分享一下你想要的结果吗?

标签: python json split pyspark apache-spark-sql


【解决方案1】:

如果每行有多个 JSON,则可以使用技巧将对象之间的逗号替换为换行符,并使用 explode 函数按换行符拆分。所以对于像这样的DF:

>>> df.show()
+-----------------+
|            items|
+-----------------+
|         {"a": 1}|
|{"a": 2},{"a": 3}|
+-----------------+

这段代码完成了这项工作:

>>> from pyspark.sql.types import ArrayType, StringType
>>> from pyspark.sql.functions import udf, explode
>>> split_jsons = lambda jsons: jsons.replace('},{', '}\n{').split('\n')
>>> df.withColumn('one_json_per_row', udf(split_jsons, ArrayType(StringType()))('items')) \
...    .select(explode('one_json_per_row').alias('item')).show()
+--------+
|    item|
+--------+
|{"a": 1}|
|{"a": 2}|
|{"a": 3}|
+--------+

那么就可以使用正则代码了

【讨论】:

  • 非常感谢 cmets。我收到一条错误消息:org.apache.spark.SparkException:作业因阶段失败而中止:阶段 47911.0 中的任务 0 失败 4 次,最近一次失败:阶段 47911.0 中丢失任务 0.3(TID 4961002、10.45.197.163、执行者1993):org.apache.spark.api.python.PythonException:回溯(最近一次调用最后):
【解决方案2】:

您可以使用 json 库并使用 rdd.flatMap() 将 json 字符串数组解析并分解为多行

import json

data = [("[{\"quantity\":25,\"type\":\"coins\",\"balance\":35}]",),
         ("[{\"balance\":40,\"type\":\"coins\",\"quantity\":25}]",),
    ("[{\"quantity\":2,\"type\":\"column_breaker\",\"balance\":2},{\"quantity\":2,\"type\":\"row_breaker\",\"balance\":2},{\"quantity\":2,\"type\":\"single_block_breaker\",\"balance\":2},{\"quantity\":1,\"type\":\"rainbow\",\"balance\":1},{\"quantity\":135,\"type\":\"coins\",\"balance\":140}]",)]

schema = StructType([StructField("items", StringType(), True)])
df = spark.createDataFrame(data,schema)

def transformRow(row):
    jsonObj = json.loads(row[0])
    rows = [Row(**item) for item in jsonObj]
    return rows

df.rdd.flatMap(transformRow).toDF().show()

输出

+-------+--------+--------------------+
|balance|quantity|                type|
+-------+--------+--------------------+
|     35|      25|               coins|
|     40|      25|               coins|
|      2|       2|      column_breaker|
|      2|       2|         row_breaker|
|      2|       2|single_block_breaker|
|      1|       1|             rainbow|
|    140|     135|               coins|
+-------+--------+--------------------+

【讨论】:

    猜你喜欢
    • 2022-11-20
    • 2021-02-27
    • 1970-01-01
    • 2019-02-12
    • 1970-01-01
    • 1970-01-01
    • 2021-12-13
    • 2020-01-02
    • 1970-01-01
    相关资源
    最近更新 更多