【问题标题】:Spark job writing to parquet - has a container with physical memory that keeps increasingSpark 作业写入 parquet - 有一个物理内存不断增加的容器
【发布时间】:2018-05-21 15:37:27
【问题描述】:

我有一个 spark 流应用程序,它从 kafka 主题中读取数据并将数据以 parquet 格式写入 hdfs。 我看到在一段时间内(非常短的时间)容器的物理内存不断增长,直到达到最大大小并失败 “诊断:容器 [pid=29328,containerID=container_e42_1512395822750_0026_02_000001] 运行超出物理内存限制。当前使用情况:使用了 1.5 GB 的 1.5 GB 物理内存;使用了 2.3 GB 的 3.1 GB 虚拟内存。正在杀死容器。” 被杀死的容器与运行驱动程序的容器相同,因此应用程序也被杀死。 在寻找这个错误时,我只看到了增加内存的解决方案,但我认为这只会推迟问题。 我想了解如果我不在内存中保存任何内容,为什么内存会不断增加。 我还看到所有容器的内存都增加了,但它们只是在一段时间后被杀死(在达到最大值之前)。 我在一些帖子中看到“您的工作是写出 Parquet 数据,Parquet 在将数据写入磁盘之前将其缓冲在内存中”。

我们正在使用的代码(我们也尝试过不重新分区 - 不确定是否需要):

val repartition = rdd.repartition(6)
val df: DataFrame = sqlContext.read.json(repartition)
df.write.mode(SaveMode.Append).parquet(dbLocation)

有什么办法可以解决不断增加的内存问题吗?

创建的 parquet 文件

显示内存增加的nodeManager日志

【问题讨论】:

    标签: apache-spark parquet


    【解决方案1】:

    假设您的应用程序除了写入之外什么都不做,我怀疑根本原因是批量接收的数据大小。在其中一个批次中接收到的数据可能超出配置的阈值。假设应用程序在本赛季被终止,解决方案是启用“back pressure”。 解决方案在下面的帖子中已经足够详细了。

    Limit Kafka batches size when using Spark Streaming

    【讨论】:

    • 内存不是一次增加的,只是随着时间的推移一直在增加,所以看起来数据真的被缓存了,但没有被垃圾收集器删除。我们的应用程序至少在 9 小时后终止
    • 您是否在数据管道中使用任何转换,如联合或按键更新?
    • 您的代码是否在目的地写入任何 parquet 文件?您知道上一个批次的文件是否在下一个批次的处理开始之前保存吗?在我看来,来自多个批次的数据在驱动程序中排队等待写入 HDFS 文件夹。您是在读取最新批次的 Kafka 流还是读取整个流?
    • 我们每 2 分钟分批从 kafka 读取数据,因此假设前一批已写入 HDFS,但会检查确定。我每次都只阅读来自 kafka 的新消息(使用 kafka 的 spark 流)
    • 我添加了创建的 parquet 文件和数据管理器日志的屏幕截图
    猜你喜欢
    • 2015-10-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-01-24
    • 1970-01-01
    • 1970-01-01
    • 2020-02-03
    相关资源
    最近更新 更多