【问题标题】:Pyspark SQL Pandas Grouped Map without GroupBy?没有GroupBy的Pyspark SQL Pandas分组地图?
【发布时间】:2020-03-03 05:54:52
【问题描述】:

我有一个数据集,我想使用多个 Pyspark SQL Grouped Map UDF 在 AWS EMR 中的临时集群上运行的大型 ETL 流程的不同阶段进行映射。 Grouped Map API 要求在应用之前对 Pyspark 数据框进行分组,但我实际上不需要对键进行分组。

目前,我正在使用任意分组,它有效,但结果是:

  1. 不必要的洗牌。

  2. 每个作业中任意 groupby 的 Hacky 代码。

我的理想解决方案允许在没有任意分组的情况下应用矢量化 Pandas UDF,但如果我可以保存至少可以消除随机分组的任意分组。

编辑

这是我的代码的样子。我最初使用的是任意分组,但目前正在根据@pault 下面的评论尝试spark_partition_id()


@pandas_udf(b_schema, PandasUDFType.GROUPED_MAP)
def transform(a_partition):
  b = a_partition.drop("pid", axis=1)
  # Some other transform stuff
  return b

(sql
  .read.parquet(a_path)
  .withColumn("pid", spark_partition_id())
  .groupBy("pid")
  .apply(transform)
  .write.parquet(b_path))

使用spark_partition_id() 似乎仍然会导致洗牌。我得到以下 DAG:

第一阶段

  1. 扫描镶木地板
  2. 项目
  3. 项目
  4. 交流

第二阶段

  1. 交流
  2. 排序
  3. FlatMapGroupsInPandas

【问题讨论】:

  • 请分享分组代码。你试过什么?到底是什么失败了?
  • @pault 使用分区 ID 似乎仍会导致随机播放。

标签: python pandas apache-spark pyspark pyspark-sql


【解决方案1】:

要支持大致等效的逻辑(函数 (pandas.core.frame.DataFrame) -> pandas.core.frame.DataFrame),您必须切换到 Spark 3.0.0 并使用 MAP_ITER 转换。

在最新的预览版 (3.0.0-preview2) 中,您需要一个 UDF:

@pandas_udf(b_schema, PandasUDFType.MAP_ITER)
def transform(dfs):
    for df in dfs:
        b = df.drop("pid", axis=1)
        ...
        yield b

df.mapInPandas(transform)

在即将发布的 3.0.0 版本中 (SPARK-28264) 只是一个简单的函数:

def transform(dfs):
    for df in dfs:
        b = df.drop("pid", axis=1)
        # Some other transform stuff
        ...
        yield b

df.mapInPandas(transform, b_schema)

在 2.x 上可能的解决方法是使用纯 SCALAR UDF,将结果的每一行序列化为 JSON,然后在另一端反序列化,即

import json
from pyspark.sql.functions import from_json

@pandas_udf("string", PandasUDFType.SCALAR)
def transform(col1, col2):
    b = pd.DataFrame({"x": col1, "y": col2})
    ...
    return b.apply(lambda x: json.dumps(dict(zip(df.columns, x))), axis=1)


(df
    .withColumn("json_result", transform("col1", "col2"))
    .withColumn("a_struct", from_json("json_result", b_schema)))

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-08-05
    • 2021-02-13
    • 1970-01-01
    • 2017-11-24
    • 1970-01-01
    • 2022-01-25
    • 2016-05-07
    相关资源
    最近更新 更多