【问题标题】:Problem running a Pandas UDF on a large dataset在大型数据集上运行 Pandas UDF 的问题
【发布时间】:2020-04-16 23:44:21
【问题描述】:

我目前正在从事一个项目,我很难理解 PySpark 中的 Pandas UDF 是如何工作的。

我有一个 Spark 集群,其中包含一个具有 8 核和 64GB 的主节点,以及两个 16 核和 112GB 的工作节点。我的数据集非常大,分为七个主要分区,每个分区约 78M 行。数据集由 70 列组成。 我定义了一个 Pandas UDF 来对数据集执行一些操作,这些操作只能使用 Python 在 Pandas 数据帧上完成。

pandas UDF 是这样定义的:

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def operation(pdf):
   #Some operations
   return pdf

spark.table("my_dataset").groupBy(partition_cols).apply(operation)

绝对没有办法让 Pandas UDF 工作,因为它甚至在执行操作之前就崩溃了。我怀疑某处存在OOM错误。上面的代码运行了几分钟,然后崩溃并显示连接已重置的错误代码。 但是,如果我在一个分区上过滤后调用 .toPandas() 函数然后显示它,它运行良好,没有错误。该错误似乎仅在使用 PandasUDF 时发生。

我不明白它是如何工作的。 Spark 是否尝试一次转换整个分区(78M 行)?如果是这样,它使用什么内存?驱动内存?执行人?如果它在驱动程序上,是否所有 Python 代码都在其上执行?

集群配置如下:

  • SPARK_WORKER_CORES=2
  • SPARK_WORKER_MEMORY=64g
  • spark.executor.cores 2
  • spark.executor.memory 30g(为 python 实例留出内存)
  • spark.driver.memory 43g

是我遗漏了什么还是无法通过 PandasUDF 运行 78M 行?

【问题讨论】:

    标签: python apache-spark pyspark pyarrow


    【解决方案1】:

    Spark 是否尝试一次转换整个分区(78M 行)?

    这正是发生的事情。 Spark 3.0 增加了对分块 UDF 的支持,这些 UDF 在 Pandas DataFramesSeries 的迭代器上运行,但如果在数据集上进行操作,则只能在 Pandas 数据帧上使用 Python 完成,这些可能不是您的正确选择。

    如果是这样,它使用什么内存?驱动内存?执行人的?

    每个分区都在本地处理,在各自的执行程序上,数据通过箭头流传输到 Python 工作者和从 Python 工作者传出。

    是我遗漏了什么,还是无法通过 PandasUDF 运行 78M 行?

    只要您有足够的内存来处理 Arrow 输入、输出(尤其是在复制数据时)、辅助数据结构以及 JVM 开销,它应该可以很好地处理大型数据集。

    但在如此小的集群上,您最好使用 Pandas 直接对输出进行分区和读取数据,而完全不使用 Spark。这样,您将能够使用所有可用资源(即 > 100GB/解释器)进行数据处理,而不是将这些资源浪费在次要任务上(具有 16GB - 开销/解释器)。

    【讨论】:

    • 非常感谢您的回答,我更了解它是如何工作的。对于我拥有的集群,您认为什么是可接受的 spark 配置?
    • 我尝试将 spark.executor.memory 设置为最大值,同时将 spark.python.worker.memory 设置为该值的 50%(占 35g 以上),它仍然崩溃.这个配置有问题吗?
    • spark.python.worker.memory 在这里完全没有意义(它适用于 RDD 聚合,你这些不会在这里发生)。至于配置 - 对于本地 Spark 作业,您可能会找到一些最佳点,但对于您描述的作业,我真的不明白这一点。
    【解决方案2】:

    要回答有关在大型 pyspark 数据帧上使用 Pandas UDF 的一般问题:

    如果您遇到内存不足错误,例如 java.lang.OutOfMemoryError : GC overhead limit exceededjava.lang.OutOfMemoryError: Java heap space 并且增加内存限制不起作用,请确保启用 pyarrow。默认禁用。

    在 pyspark 中,您可以使用以下方式启用它:

    spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

    更多信息here.

    【讨论】:

      猜你喜欢
      • 2018-11-19
      • 1970-01-01
      • 2014-02-18
      • 2019-06-29
      • 1970-01-01
      • 2013-09-13
      • 2020-12-19
      • 2011-03-18
      • 2012-06-09
      相关资源
      最近更新 更多