另一种方法可以是自定义函数,以满足您要应用的逻辑。
但是这种方法可能会有点复杂,因为它在实施时几乎没有需要注意的细节。
以上两个答案都更好更快,因为它们使用 Spark Native Functions
这只是使用applyInPandas 的另一种方法
ApplyInPandas 方法
自定义函数 - 包含自定义逻辑
def generate_ctas_list(inp,base_column=None,target_column=None,column_lst=None):
### Sort the values for your Target Columns
tgt_col_values = np.sort(inp[target_column].values)
### Getting the Max Value , to use to generate the List
max_tgt = tgt_col_values[-1]
zeros_lst = np.zeros(max_tgt)
### Substract 1 to ensure indexing starts from 0
tgt_col_values = tgt_col_values - 1
### Update the Zeros List with tgt_col_values to 1
zeros_lst[tgt_col_values] = 1
### Get the singular value for Base Column
base_value = inp.loc[:,base_column].values[0]
### Generate DataFrame from the above custom logic to return
data = [base_value,zeros_lst]
res = pd.DataFrame(data,column_lst).T
return res
实施
from functools import partial
import pandas as pd
import numpy as np
input_list = [
(11,1)
,(11,2)
,(11,7)
,(45,7)
]
sparkDF = sql.createDataFrame(input_list,['ID','CTA'])
schema = StructType([
StructField('ID', DoubleType(), True),
StructField('CTA', ArrayType(DoubleType()), True)
])
partial_func = partial(generate_ctas_list,base_column='ID',target_column='CTA',column_lst=['ID','CTA'])
sparkDF_agg = sparkDF.groupby('ID').applyInPandas(partial_func,schema)
sparkDF_agg.show(truncate=False)
+----+-----------------------------------+
|ID |CTA |
+----+-----------------------------------+
|11.0|[1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0]|
|45.0|[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0]|
+----+-----------------------------------+