【问题标题】:How to solve pyspark `org.apache.arrow.vector.util.OversizedAllocationException` error by increasing spark's memory?如何通过增加 spark 的内存来解决 pyspark `org.apache.arrow.vector.util.OversizedAllocationException` 错误?
【发布时间】:2020-02-04 17:59:49
【问题描述】:

我在pyspark 中运行一份工作,我曾经使用grouped aggregate Pandas UDF。这会导致以下(此处为缩写)错误:

org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand the buffer

我很确定这是因为 pandas UDF 接收的组之一很大,如果我减少数据集并删除足够多的行,我可以毫无问题地运行我的 UDF。但是,我想使用我的原始数据集运行,即使我在具有 192.0 GiB RAM 的机器上运行这个 spark 作业,我仍然会遇到同样的错误。 (而 192.0 GiB 应该足以将整个数据集保存在内存中。)

如何为 spark 提供足够的内存来运行需要大量内存的分组聚合 Pandas UDF?

例如,是否有一些我遗漏的 spark 配置可以为 apache 箭头提供更多内存?

更长的错误信息

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
 in 
----> 1 device_attack_result.count()
      2 
      3 
      4 

/usr/lib/spark/python/pyspark/sql/dataframe.py in count(self)
    520         2
    521         """
--> 522         return int(self._jdf.count())
    523 
    524     @ignore_unicode_prefix

/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o818.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 102 in stage 27.0 failed 4 times, most recent failure: Lost task 102.3 in stage 27.0 (TID 3235, ip-172-31-111-163.ec2.internal, executor 1): org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand the buffer
...

Full error message here.

【问题讨论】:

  • 您解决过这个问题吗?我也在经历这个......
  • 我相信@artem-vovsia 是正确的,因为我遇到了 Apache Arrows 内部限制。所以我“解决”了这个问题(了解它的原因),但这并没有导致一个简单的解决方案。我不得不经历的艰难解决方案就是通过 Arrow 发送更少的数据。例如,我将所有带有字符串的列编码为整数和其他技巧。

标签: apache-spark pyspark user-defined-functions apache-arrow


【解决方案1】:

据我了解,一个组的所有数据都会在应用函数之前加载到内存中。这可能会导致内存不足异常,尤其是在组大小有偏差的情况下。 maxRecordsPerBatch 的配置不适用于组,您需要确保分组数据适合可用内存。

您可以尝试对数据进行加盐处理,以确保组没有偏差。请参阅下面有关盐渍连接的文章。同样的概念也可以在这里应用

https://datarus.wordpress.com/2015/05/04/fighting-the-skew-in-spark/

【讨论】:

  • 这也是我的理解。我的问题是:我可以增加 Spark 的可用内存来容纳我的庞大团队吗?这里的一个假设是,我可用的 200GB RAM 不知何故并未全部被 spark 使用。
  • 这取决于集群的配置。您能否分享一下您的集群的详细信息,例如执行器数量、执行器内存、核心等?
【解决方案2】:
  1. 您是否尝试过将--executor-memory spark-submit 选项设置为180g,以便Spark 使用所有可用内存?
  2. 实际上,Spark 似乎不是 OOMing 或典型的数据倾斜问题。当您的一个数据结构达到 Apache Arrow 内部限制时,这看起来是一种相当奇怪的情况 - 缓冲区的大小不能大于 Integer.MAX_VALUE 字节:https://github.com/apache/arrow/blob/157b179812adb8f29e5966682ff1937f85ce192a/java/vector/src/main/java/org/apache/arrow/vector/BaseValueVector.java#L42。我不确定 Arrow 的工作原理,但对我来说,您的某个数据点似乎包含超过 4Gbs 的数据

【讨论】:

    【解决方案3】:

    Spark 的 PandasUDF 功能使用 Arrow 框架将 spark DataFrame 转换为 pandas DataFrame,此时 Arrow 内部缓冲区限制仅为 2GB,因此您的 pandasUDF 分组条件不应产生超过 2 GB 的未压缩数据。

    df.groupby('id').apply(function)
    

    我是说

    只有当您的分组按分区时,您才能运行您的 pandas UDF 方法 大小小于 2 GB 未压缩

    这是你的参考票

    https://issues.apache.org/jira/browse/ARROW-4890

    上述问题似乎在 >= 0.15 版本的 pyarrow 中得到解决,只有 Spark 3.x 使用 pyarrow 0.15 版本

    【讨论】:

      【解决方案4】:

      箭头 0.16 已将最大缓冲区分配大小从 MaxInteger 更改为 MaxLong(64 位) https://issues.apache.org/jira/browse/ARROW-6112

      截至 2020 年 7 月,上游 Spark 仍基于 Arrow 0.15 https://github.com/apache/spark/blob/master/python/setup.py

      尽管如此,Netty 后备缓冲区仍然不支持这一点。所以您仍有可能将这个问题视为不同的例外。

      因此,由于上述限制,到目前为止,这仍然是不可能的。

      这可能会在 Spark 方面得到解决 https://issues.apache.org/jira/browse/SPARK-32294 想法是将 GroupedData 分批输入 pandas UDF 以解决此问题。

      更新:Databricks 平台上的 PySpark 没有这个问题。需要 DBR7.4+

      【讨论】:

      • 我也遇到过这个问题,升级到 DBR 7.6 解决了这个问题,TYSM
      猜你喜欢
      • 2013-01-15
      • 2013-11-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-10-21
      • 2014-10-26
      • 1970-01-01
      相关资源
      最近更新 更多