【问题标题】:PySpark application fail with java.lang.OutOfMemoryError: Java heap spacePySpark 应用程序因 java.lang.OutOfMemoryError 失败:Java 堆空间
【发布时间】:2018-07-20 09:07:36
【问题描述】:

我分别通过 pycharm 和 pyspark shell 运行 spark。 我遇到了这个错误:

: java.lang.OutOfMemoryError: Java heap space
    at org.apache.spark.api.python.PythonRDD$.readRDDFromFile(PythonRDD.scala:416)
    at org.apache.spark.api.python.PythonRDD.readRDDFromFile(PythonRDD.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:748)

我的代码是:

from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
import time

if __name__ == '__main__':

    print("Started at " + time.strftime("%H:%M:%S"))

    conf = (SparkConf()
            .setAppName("TestRdd") \
            .set('spark.driver.cores', '1') \
            .set('spark.executor.cores', '1') \
            .set('spark.driver.memory', '16G') \
            .set('spark.executor.memory', '9G'))
    sc = SparkContext(conf=conf)

    rdd = sc.parallelize(range(1000000000),100)

    print(rdd.take(10))

    print("Finished at " + time.strftime("%H:%M:%S"))

这些是最大内存设置,我可以在集群上设置。我试图将所有内存分配给 1 个核心来创建 rdd。但在我看来,应用程序在分发数据集之前失败了。它在创建我假设的步骤时失败。我还尝试将不同数量的分区设置为 100-10000。我已经计算出它需要多少内存,所以 10 亿 int - 大约 4.5-4.7Gb 内存,比我少,但没有运气。

如何优化和强制运行我的代码?

【问题讨论】:

    标签: python python-2.7 apache-spark pyspark rdd


    【解决方案1】:

    TL;DR 不要在外部测试和简单实验中使用parallelize。因为您使用的是 Python 2.7,range 并不懒惰,因此您将实现多种类型的全范围值:

    • 调用后的 Python list
    • 序列化版本,稍后将写入磁盘。
    • 在 JVM 上加载的序列化副本。

    使用 xrange 会有所帮助,但你不应该首先使用 parallelize(或 2018 年的 Python 2)。

    如果您想创建一系列值,只需使用SparkContext.range

    range(start, end=None, step=1, numSlices=None)

    创建一个新的 int RDD,包含从头到尾的元素(不包括在内),每个元素逐步增加。可以像 python 的内置 range() 函数一样调用。如果使用单个参数调用,则该参数被解释为 end,并且 start 设置为 0。

    所以在你的情况下:

    rdd = sc.range(1000000000, numSlices=100)
    

    DataFrame:

    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder.getOrCreate()
    
    df = spark.range(1000000000, numPartitions=100)
    

    【讨论】:

    • sc.range 只需要存储每个分区的开头和结尾。
    猜你喜欢
    • 2021-11-23
    • 1970-01-01
    • 2021-09-09
    • 2011-10-24
    • 2017-05-13
    • 1970-01-01
    • 1970-01-01
    • 2018-08-13
    • 2014-12-23
    相关资源
    最近更新 更多