【发布时间】:2020-03-03 05:54:52
【问题描述】:
我有一个数据集,我想使用多个 Pyspark SQL Grouped Map UDF 在 AWS EMR 中的临时集群上运行的大型 ETL 流程的不同阶段进行映射。 Grouped Map API 要求在应用之前对 Pyspark 数据框进行分组,但我实际上不需要对键进行分组。
目前,我正在使用任意分组,它有效,但结果是:
不必要的洗牌。
每个作业中任意 groupby 的 Hacky 代码。
我的理想解决方案允许在没有任意分组的情况下应用矢量化 Pandas UDF,但如果我可以保存至少可以消除随机分组的任意分组。
编辑:
这是我的代码的样子。我最初使用的是任意分组,但目前正在根据@pault 下面的评论尝试spark_partition_id()。
@pandas_udf(b_schema, PandasUDFType.GROUPED_MAP)
def transform(a_partition):
b = a_partition.drop("pid", axis=1)
# Some other transform stuff
return b
(sql
.read.parquet(a_path)
.withColumn("pid", spark_partition_id())
.groupBy("pid")
.apply(transform)
.write.parquet(b_path))
使用spark_partition_id() 似乎仍然会导致洗牌。我得到以下 DAG:
第一阶段
- 扫描镶木地板
- 项目
- 项目
- 交流
第二阶段
- 交流
- 排序
- FlatMapGroupsInPandas
【问题讨论】:
-
请分享分组代码。你试过什么?到底是什么失败了?
-
@pault 使用分区 ID 似乎仍会导致随机播放。
标签: python pandas apache-spark pyspark pyspark-sql