【问题标题】:Kafka + Spark scalabilityKafka + Spark 可扩展性
【发布时间】:2016-11-04 01:19:55
【问题描述】:

我们有非常简单的 Spark Streaming 作业(用 Java 实现),即:

  • 通过 DirectStream 从 Kafka 读取 JSON(关闭 Kafka 消息的确认)
  • 将 JSON 解析为 POJO(使用 GSON - 我们的消息只有约 300 个字节)
  • 将 POJO 映射到键值元组(值 = 对象)
  • reduceByKey(自定义 reduce 函数 - 始终比较对象的 1 个字段 - 质量 - 并以更高的质量离开对象实例)
  • 将结果存储在状态中(通过 mapWithState 存储每个键质量最高的对象)
  • 将结果存储到 HDFS

JSON 由一组 1000 个 ID(键)生成,所有事件随机分布到 Kafka 主题分区。这也意味着,生成的对象集最多为 1000 个,因为作业只存储每个 ID 质量最高的对象。

我们使用以下参数在 AWS EMR(m4.xlarge = 4 核,16 GB 内存)上运行性能测试:

  • 执行器数 = 节点数(即每个节点 1 个执行器)
  • Kafka 分区数 = 节点数(即在我们的例子中也是执行器)
  • 批量大小 = 10 (s)
  • 滑动窗口 = 20 (s)
  • 窗口大小 = 600 (s)
  • 块大小 = 2000 (ms)
  • 默认并行度 - 尝试了不同的设置,但是当默认并行度为 = 节点数/执行器数时获得最佳结果

Kafka 集群仅包含 1 个代理,在峰值负载期间最多可使用约 30-40%(我们将数据预填充到主题,然后独立执行测试)。我们尝试增加 num.io.threads 和 num.network.threads,但没有明显改善。

性能测试的结果(大约 10 分钟的持续负载)是(YARN master 和 Driver 节点在下面的节点数之上):

  • 2 个节点 - 最多可以处理。 150 000 个事件/秒,没有任何处理延迟
  • 5 个节点 - 280 000 个事件/秒 => 25 % 惩罚如果与预期的“几乎线性可扩展性”相比
  • 10 个节点 - 380 000 个事件/秒 => 50% 惩罚如果与预期的“几乎线性可扩展性”相比

2 个节点的 CPU 利用率为 ~

我们还尝试了其他设置,包括: - 测试低/高数量的分区 - 测试 defaultParallelism 的低/高/默认值 - 使用更多的执行者进行测试(即将资源划分为例如 30 个执行者而不是 10 个) 但是上面的设置给了我们最好的结果。

所以——问题是——Kafka + Spark(几乎)是线性可扩展的吗?如果它的可扩展性比我们的测试显示的要好得多 - 如何改进它。我们的目标是支持成百上千的 Spark 执行器(即可扩展性对我们来说至关重要)。

【问题讨论】:

  • 您的用例是在 reduceByKey 中进行完整的数据洗牌,我想随着您扩展集群,这将变得越来越昂贵。至少,全局性能受到单个最差执行器性能的限制,当您添加执行器时,它只会变得更糟。您可以尝试使用 Kafka 分区器将给定 id 的所有消息都放在一个分区中吗?我认为它应该允许几乎线性的比例。
  • 集群中有多少台kafka服务器?他们之间是否有任何分区复制设置?
  • 有很多不同的部分需要考虑。你的 Kafka 集群有多少个分区?你的检查点间隔有多大?
  • @C4stor - reduceByKey 应该只对每个执行器内的缩减结果进行随机播放 - 这应该是相当少量的数据(每个执行器 1000 条记录)。我们还测试了 1 个分区的性能,但与上述情况相比,情况更糟,因为 Kafka 分区仅分配给 1 个执行程序,其余执行程序空闲直到下一次 shuffle(即繁重的工作 = 读取数据和解析它们只能在 1 个执行器上完成)。
  • @ArturBiesiadowski - 我将更新这个问题 - 我们只有 1 个 Kafka 代理,因为服务器的使用率不到 30-40%。我们已经尝试在 Kafka 配置中增加 IO 和网络线程,但没有显着改进(上面的数字已经用于设置 Kafka 代理的 IO/网络线程的三倍与默认设置)。没有配置分区复制(因为只使用了 1 个代理,所以没有意义)。

标签: performance apache-spark apache-kafka scalability


【解决方案1】:

我们已经通过以下方式解决了这个问题:

  • 增加Kafka集群容量
    • 更多 CPU 能力 - 增加了 Kafka 的节点数量(每 2 个 spark exectur 节点 1 个 Kafka 节点似乎没问题)
    • 更多经纪人 - 基本上每个执行者 1 个经纪人给了我们最好的结果
  • 设置适当的默认并行度(集群中的核心数 * 2)
  • 确保所有节点将有大约。相同的工作量
    • batch size/blockSize 应该是 ~equal 或 executor 数量的倍数

最后,我们已经能够实现由具有 10 个执行程序节点的 spark 集群处理的 1100000 个事件/秒。所做的调整还提高了具有较少节点的配置的性能 -> 当从 2 个到 10 个 Spark 执行器节点(AWS 上的 m4.xlarge)扩展时,我们已经实现了几乎线性的可扩展性。

一开始,Kafka 节点上的 CPU 并没有接近极限,但无法响应 Spark 执行器的需求。

感谢所有建议,特别是 @ArturBiesiadowski,他建议 Kafka 集群的大小不正确。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-10-16
    • 2011-07-07
    • 2016-11-20
    • 2010-12-03
    • 2019-11-11
    • 1970-01-01
    相关资源
    最近更新 更多