【问题标题】:Pyspark : Change nested column datatypePyspark:更改嵌套列数据类型
【发布时间】:2018-01-31 03:21:03
【问题描述】:

我们如何在 Pyspark 中更改嵌套列的数据类型?对于 rxample,如何将 value 的数据类型从 string 更改为 int?

参考:how to change a Dataframe column from String type to Double type in pyspark

{
    "x": "12",
    "y": {
        "p": {
            "name": "abc",
            "value": "10"
        },
        "q": {
            "name": "pqr",
            "value": "20"
        }
    }
}

【问题讨论】:

  • 1.此更改是否需要持久化,并将更改保存到 json 文件中?还是您在执行操作时需要精度?
  • @diek 需要白色写入 json 文件

标签: dictionary pyspark


【解决方案1】:

您可以使用读取 json 数据

from pyspark import SQLContext

sqlContext = SQLContext(sc)
data_df = sqlContext.read.json("data.json", multiLine = True)

data_df.printSchema()

输出

root
 |-- x: long (nullable = true)
 |-- y: struct (nullable = true)
 |    |-- p: struct (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- value: long (nullable = true)
 |    |-- q: struct (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- value: long (nullable = true)

现在您可以访问 y 列中的数据

data_df.select("y.p.name")
data_df.select("y.p.value")

输出

abc, 10

好的,解决方案是添加一个具有正确架构的新嵌套列并删除具有错误架构的列

from pyspark.sql.functions import *
from pyspark.sql import Row

df3 = spark.read.json("data.json", multiLine = True)

# create correct schema from old 
c = df3.schema['y'].jsonValue()
c['name'] = 'z'
c['type']['fields'][0]['type']['fields'][1]['type'] = 'long'
c['type']['fields'][1]['type']['fields'][1]['type'] = 'long'

y_schema = StructType.fromJson(c['type'])

# define a udf to populate the new column. Row are immuatable so you 
# have to build it from start.

def foo(row):
    d = Row.asDict(row)
    y = {}
    y["p"] = {}
    y["p"]["name"] = d["p"]["name"]
    y["p"]["value"] = int(d["p"]["value"])
    y["q"] = {}
    y["q"]["name"] = d["q"]["name"]
    y["q"]["value"] = int(d["p"]["value"])

    return(y)
map_foo = udf(foo, y_schema)

# add the column
df3_new  = df3.withColumn("z", map_foo("y"))

# delete the column
df4 = df3_new.drop("y")


df4.printSchema()

输出

root
 |-- x: long (nullable = true)
 |-- z: struct (nullable = true)
 |    |-- p: struct (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- value: long (nullable = true)
 |    |-- q: struct (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- value: long (nullable = true)


df4.show()

输出

+---+-------------------+
|  x|                  z|
+---+-------------------+
| 12|[[abc,10],[pqr,10]]|
+---+-------------------+

【讨论】:

  • @aswinids 我已经编辑了这个问题。对这个有什么想法吗?
  • @aswinids :感谢您的帮助。我们在 json 模式中有 decima/timestamp 数据类型吗?
  • @aswinids:如果我将 10 的值更改为“10”并使用类型:'long',我会得到 null
  • @zero323 你有什么想法吗?
  • @J.D 上面的 json_schema 完全可以正常工作。你能再检查一次吗?是的,我在将值转换为字符串后正在读取 json 文件。
【解决方案2】:

使用任意变量名似乎很简单,但这是有问题的,并且与 PEP8 相反。在处理数字时,我建议避免在迭代此类结构时使用常用名称……即值。

import json

with open('random.json') as json_file:
    data = json.load(json_file)

for k, v in data.items():
    if k == 'y':
        for key, item in v.items():
            item['value'] = float(item['value'])


print(type(data['y']['p']['value']))
print(type(data['y']['q']['value']))
# mac → python3 make_float.py
# <class 'float'>
# <class 'float'>
json_data = json.dumps(data, indent=4, sort_keys=True)
with open('random.json', 'w') as json_file:
    json_file.write(json_data)

【讨论】:

  • 这个问题的关键部分是我们每天产生大约 60GB 的数据,我们需要确保可扩展性,这就是为什么 Spark 是出路
  • 当然这无法处理如此大量的数据。为什么您引用的问题不起作用?从文档中他们给出了一个处理这个问题的例子:ghostbin.com/paste/wt5y6
猜你喜欢
  • 2021-12-08
  • 2021-04-10
  • 2018-01-09
  • 1970-01-01
  • 1970-01-01
  • 2019-01-02
  • 2021-06-16
  • 1970-01-01
  • 2022-11-11
相关资源
最近更新 更多