【发布时间】:2016-11-04 01:19:55
【问题描述】:
我们有非常简单的 Spark Streaming 作业(用 Java 实现),即:
- 通过 DirectStream 从 Kafka 读取 JSON(关闭 Kafka 消息的确认)
- 将 JSON 解析为 POJO(使用 GSON - 我们的消息只有约 300 个字节)
- 将 POJO 映射到键值元组(值 = 对象)
- reduceByKey(自定义 reduce 函数 - 始终比较对象的 1 个字段 - 质量 - 并以更高的质量离开对象实例)
- 将结果存储在状态中(通过 mapWithState 存储每个键质量最高的对象)
- 将结果存储到 HDFS
JSON 由一组 1000 个 ID(键)生成,所有事件随机分布到 Kafka 主题分区。这也意味着,生成的对象集最多为 1000 个,因为作业只存储每个 ID 质量最高的对象。
我们使用以下参数在 AWS EMR(m4.xlarge = 4 核,16 GB 内存)上运行性能测试:
- 执行器数 = 节点数(即每个节点 1 个执行器)
- Kafka 分区数 = 节点数(即在我们的例子中也是执行器)
- 批量大小 = 10 (s)
- 滑动窗口 = 20 (s)
- 窗口大小 = 600 (s)
- 块大小 = 2000 (ms)
- 默认并行度 - 尝试了不同的设置,但是当默认并行度为 = 节点数/执行器数时获得最佳结果
Kafka 集群仅包含 1 个代理,在峰值负载期间最多可使用约 30-40%(我们将数据预填充到主题,然后独立执行测试)。我们尝试增加 num.io.threads 和 num.network.threads,但没有明显改善。
性能测试的结果(大约 10 分钟的持续负载)是(YARN master 和 Driver 节点在下面的节点数之上):
- 2 个节点 - 最多可以处理。 150 000 个事件/秒,没有任何处理延迟
- 5 个节点 - 280 000 个事件/秒 => 25 % 惩罚如果与预期的“几乎线性可扩展性”相比
- 10 个节点 - 380 000 个事件/秒 => 50% 惩罚如果与预期的“几乎线性可扩展性”相比
2 个节点的 CPU 利用率为 ~
我们还尝试了其他设置,包括: - 测试低/高数量的分区 - 测试 defaultParallelism 的低/高/默认值 - 使用更多的执行者进行测试(即将资源划分为例如 30 个执行者而不是 10 个) 但是上面的设置给了我们最好的结果。
所以——问题是——Kafka + Spark(几乎)是线性可扩展的吗?如果它的可扩展性比我们的测试显示的要好得多 - 如何改进它。我们的目标是支持成百上千的 Spark 执行器(即可扩展性对我们来说至关重要)。
【问题讨论】:
-
您的用例是在 reduceByKey 中进行完整的数据洗牌,我想随着您扩展集群,这将变得越来越昂贵。至少,全局性能受到单个最差执行器性能的限制,当您添加执行器时,它只会变得更糟。您可以尝试使用 Kafka 分区器将给定 id 的所有消息都放在一个分区中吗?我认为它应该允许几乎线性的比例。
-
集群中有多少台kafka服务器?他们之间是否有任何分区复制设置?
-
有很多不同的部分需要考虑。你的 Kafka 集群有多少个分区?你的检查点间隔有多大?
-
@C4stor - reduceByKey 应该只对每个执行器内的缩减结果进行随机播放 - 这应该是相当少量的数据(每个执行器 1000 条记录)。我们还测试了 1 个分区的性能,但与上述情况相比,情况更糟,因为 Kafka 分区仅分配给 1 个执行程序,其余执行程序空闲直到下一次 shuffle(即繁重的工作 = 读取数据和解析它们只能在 1 个执行器上完成)。
-
@ArturBiesiadowski - 我将更新这个问题 - 我们只有 1 个 Kafka 代理,因为服务器的使用率不到 30-40%。我们已经尝试在 Kafka 配置中增加 IO 和网络线程,但没有显着改进(上面的数字已经用于设置 Kafka 代理的 IO/网络线程的三倍与默认设置)。没有配置分区复制(因为只使用了 1 个代理,所以没有意义)。
标签: performance apache-spark apache-kafka scalability