【发布时间】:2020-04-16 23:44:21
【问题描述】:
我目前正在从事一个项目,我很难理解 PySpark 中的 Pandas UDF 是如何工作的。
我有一个 Spark 集群,其中包含一个具有 8 核和 64GB 的主节点,以及两个 16 核和 112GB 的工作节点。我的数据集非常大,分为七个主要分区,每个分区约 78M 行。数据集由 70 列组成。 我定义了一个 Pandas UDF 来对数据集执行一些操作,这些操作只能使用 Python 在 Pandas 数据帧上完成。
pandas UDF 是这样定义的:
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def operation(pdf):
#Some operations
return pdf
spark.table("my_dataset").groupBy(partition_cols).apply(operation)
绝对没有办法让 Pandas UDF 工作,因为它甚至在执行操作之前就崩溃了。我怀疑某处存在OOM错误。上面的代码运行了几分钟,然后崩溃并显示连接已重置的错误代码。 但是,如果我在一个分区上过滤后调用 .toPandas() 函数然后显示它,它运行良好,没有错误。该错误似乎仅在使用 PandasUDF 时发生。
我不明白它是如何工作的。 Spark 是否尝试一次转换整个分区(78M 行)?如果是这样,它使用什么内存?驱动内存?执行人?如果它在驱动程序上,是否所有 Python 代码都在其上执行?
集群配置如下:
- SPARK_WORKER_CORES=2
- SPARK_WORKER_MEMORY=64g
- spark.executor.cores 2
- spark.executor.memory 30g(为 python 实例留出内存)
- spark.driver.memory 43g
是我遗漏了什么还是无法通过 PandasUDF 运行 78M 行?
【问题讨论】:
标签: python apache-spark pyspark pyarrow