【发布时间】:2023-01-18 21:47:17
【问题描述】:
我正在尝试将一个进程从 Pandas 转移到 Pyspark,但我是 Pyspark 的新手。注意:这是一个 EDA 过程,所以我现在不太担心将它作为一个循环,我可以在以后优化它。
设置:
import pandas as pd
import numpy as np
import pyspark.pandas as ps
虚拟数据:
df = ps.DataFrame({'id': ['ID_01', 'ID_02', 'ID_02', 'ID_03', 'ID_03'], 'name': ['Jack', 'John', 'John', 'James', 'Jamie']})
df_pandas = df.to_pandas()
df_spark = df.to_spark()
df
| id | name |
|---|---|
| ID_01 | Jack |
| ID_02 | John |
| ID_02 | John |
| ID_03 | James |
| ID_03 | Jamie |
熊猫代码:
unique_ids = df_pandas['id'].unique()
for unique_id in unique_ids:
names = '; '.join(sorted(df_pandas[df_pandas['id'] == unique_id]['name'].unique()))
df.loc[df['id'] == unique_id, 'name'] = names
df
| id | name |
|---|---|
| ID_01 | Jack |
| ID_02 | John |
| ID_02 | John |
| ID_03 | James; Jamie |
| ID_03 | James; Jamie |
最后一个表是所需的输出。但是,我在 PySpark 中实现这个时遇到了问题。这是我必须要做的:
unique_ids = df_spark.select('id').distinct().collect()
for unique_id in unique_ids:
names = df_spark.filter(df_spark.id == unique_id.id).select('name').distinct()
然后我不确定如何进行下一步;即如何连接生成的单列 DataFrame,也不是如何确保正确替换。
我调查了以下来源,但没有成功(可能是因为我对 PySpark 缺乏经验):
【问题讨论】:
标签: python pandas apache-spark pyspark