【发布时间】:2018-08-27 16:06:22
【问题描述】:
我正在尝试使用在 Kubernetes 上运行的 Apache Spark 2.3 Scala API 来扩展结构化流式处理管道。作业的基本流程是这样的:
- 读取包含约 1,000,000 条记录的静态数据集,这些记录将各个源 ID 映射到输出聚合
- 从 Kafka 读取流数据集,其中包含要聚合的时间序列指标,映射到其源 ID
- 根据源 ID 重新分区每个数据集
- 在源 ID 上加入 2 个数据集(这会将指标映射到正确的输出聚合,同时还会从 kafka 中过滤掉不应聚合的数据)
- 应用水印
- 删除重复项
- 聚合数据
- 写入 Kafka 输出接收器
我在 Kubernetes 上运行并配置了一个包含 30 个执行器的集群,每个执行器具有 3 个核心。 Kafka 目前每个源 ID 每秒传输 600000 个指标,并配置了 600 个分区。我正在尝试将它们全部聚合成 10 个不同的输出(即每个输出聚合由 60000 个不同的源 ID 组成)。我每 10 秒触发一次管道,以处理来自 Kafka 的约 6,000,000 条记录。我的聚合窗口是 1 分钟不重叠,我将水印设置为 30 秒。理想情况下,我想要一个更长的水印来解释迟到的数据,但删除重复/水印阶段似乎是一个瓶颈,特别是在调用垃圾收集器时。以下是我最近运行的管道的一些数据:
Processed And Input Rows Per Second
该图显示管道每秒保持输入行大约 8-9 分钟,但随后橙色线下降到绿线以下(时间轴上约 10:01),并且管道有一个硬时间跟上输入数据速率。我查看了 Spark UI 以寻找有关为什么会出现减速的线索,并发现一个执行程序在删除重复/水印阶段需要 55 秒来执行 GC。以下是舞台的汇总统计数据和事件时间线的放大图:
我尝试了here 建议的多种技术,但结果好坏参半。特别是:
- Kryo 序列化似乎收效甚微。
- 使用这些设置 -XX:+UseG1GC -XX:MaxGCPauseMillis=500,可以减少长时间停顿的频率,但它们仍然会发生。
- 我打开了 GC 日志并通过gceasy 处理它们,并尝试遵循他们的建议。这表明长时间停顿来自 Full GC 事件,并且日志没有显示增加 GC 线程数会有所帮助的症状。平均创建速率为 182.18 mb/秒,平均推广速率为 49.8 mb/秒
- 我尝试将 NewRatio 降低到 1,但这会导致更频繁的长时间停顿且持续时间更短(即每次停顿约 25 秒,而不是 50 多秒)
- 很难知道我的流数据集使用了多少内存,因为如果我尝试缓存它,就会出错。
其余的内存建议就像“尝试修改这个参数或那个参数”,但很难尝试每一个排列,并且它并没有表明我应该期待什么行为。有人可以指出我下一步要遵循的方向吗?我觉得 GC 55 秒是不合理的,应该有一些方法来调整它,这样我的工作就不会受到 1 个执行者的阻碍。
【问题讨论】:
标签: apache-spark kubernetes garbage-collection jvm apache-spark-sql