【发布时间】:2018-11-27 10:05:07
【问题描述】:
我的 spark DataFrame 中有一个名为 event_data 的列,格式为 json,在使用 from_json 阅读后,我得到了这个架构:
root
|-- user_id: string (nullable = true)
|-- event_data: struct (nullable = true)
| |-- af_content_id: string (nullable = true)
| |-- af_currency: string (nullable = true)
| |-- af_order_id: long (nullable = true)
我只需要本专栏中的af_content_id。该属性可以是不同的格式:
['ghhjj23','123546',12356]
af_content_id)
我想使用explode 函数来为af_content_id 中格式为List 的每个元素返回一个新行。但是当我应用它时,我得到一个错误:
from pyspark.sql.functions import explode
def get_content_id(column):
return column.af_content_id
df_transf_1 = df_transf_1.withColumn(
"products_basket",
get_content_id(df_transf_1.event_data)
)
df_transf_1 = df_transf_1.withColumn(
"product_id",
explode(df_transf_1.products_basket)
)
由于数据类型不匹配,无法解析 'explode(
products_basket)':explode 函数的输入应该是数组或映射类型,而不是 StringType;
我知道原因,是因为af_content_id字段可能包含的类型不同,但我不知道如何解决。直接在列上使用pyspark.sql.functions.array() 是行不通的,因为它变成了array 的array,并且explode 不会产生预期的结果。
重现我坚持的步骤的示例代码:
import pandas as pd
arr = [
['b5ad805c-f295-4852-82fc-961a88',12732936],
['0FD6955D-484C-4FC8-8C3F-DA7D28',['Gklb38','123655']],
['0E3D17EA-BEEF-4931-8104','12909841'],
['CC2877D0-A15C-4C0A-AD65-762A35C1',[12645715, 12909837, 12909837]]
]
df = pd.DataFrame(arr, columns = ['user_id','products_basket'])
df = df[['user_id','products_basket']].astype(str)
df_transf_1 = spark.createDataFrame(df)
我正在寻找一种将 products_basket 转换为唯一可能的格式的方法:Array,这样当我应用 explode 时,它会包含一个 id行。
【问题讨论】:
-
原因是
af_content_id1是StringType,不能爆。它可能看起来有时像一个列表,有时像一个字符串,但它实际上始终是一个字符串。可能有解决方法 - 请提供一个小的 reproducible example 与您想要的输出。