【问题标题】:How broadcast a huge rdd in pyspark?如何在pyspark中广播一个巨大的rdd?
【发布时间】:2019-02-25 03:55:43
【问题描述】:

当我打印出我的 rdd 的第一个元素时:

print("input = {}".format(input.take(1)[0]))

我得到的结果是:(u'motor', [0.001,..., 0.9])

[0.001,..., 0.9] 的类型是一个列表。

输入rdd中元素个数等于53304100

当我想按以下方式广播输入 RDD 时,我的问题就来了:

brod = sc.broadcast(input.collect())

生成的异常如下(我只展示了异常的第一部分):

    WARN TaskSetManager: Lost task 56.0 in stage 1.0 (TID 176, 172.16.140.144, executor 0): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
TypeError: <lambda>() missing 1 required positional argument: 'document'

【问题讨论】:

    标签: apache-spark pyspark


    【解决方案1】:

    如果你的 RDD 太大,应用程序可能会遇到 OutOfMemory 错误,这是由于 collect 方法将所有数据拉取到驱动程序的内存通常不够大。

    所以你可以尝试通过

    来增加你的驱动程序的内存
    pyspark --driver-memory 4g
    

    【讨论】:

    • 你的意思是 spark-submit --driver-memory 4g?
    • 我增加了驱动程序内存,我得到了一个新的异常。请看我修改后的帖子
    • 我认为你应该尝试从 spark execor UI 分析,例如 (jaceklaskowski.gitbooks.io/mastering-apache-spark/…)。它不仅包含错误日志,还包含更多信息。可以贴吗?我猜你的 rdd 还是太大了,也许你可以尝试将分区数增加到 100 [rdd.repartition(100)] 以将数据处理分布在节点之间,并将 shuffle 数据保持在 2 GB 以下。此外,广播仅适用于小数据集。您也可以尝试删除 boradcast。
    • 我再次运行代码(severak 次)。当我将分区数增加到 128(2*32*2)时,没有出现旧的异常。我更新帖子以显示新帖子。顺便说一句,如果您有想法,请在stackoverflow.com/questions/54540970/… 上提供帮助
    猜你喜欢
    • 2016-04-11
    • 1970-01-01
    • 1970-01-01
    • 2016-04-18
    • 1970-01-01
    • 2019-09-11
    • 1970-01-01
    • 2022-10-25
    • 2016-04-28
    相关资源
    最近更新 更多