【发布时间】:2016-03-23 07:51:56
【问题描述】:
我有一个 WordCount 程序在 4 个工作节点的 Flink 集群中运行,它从 Kafka 主题中读取数据。
在这个主题中,有很多预加载的文本(单词)。主题中的词满足Zipf 分布。该主题有 16 个分区。每个分区里面有大约 7 亿个数据。
有一个节点比其他节点慢得多。如图所示,worker2 是较慢的节点。但较慢的节点并不总是worker2。从我的测试来看,worker3 或集群中的其他节点也可能更慢。
但是,集群中总是有这么慢的工作节点。在集群中,每个worker节点有4个task slot,总共有16个task slot。
一段时间后,发送到其他工作节点(较慢的节点除外)的发送记录将不再增加。发送到较慢的节点的记录会增加到与其他节点相同的级别,速度要快得多。
有没有人可以解释为什么会发生这种情况?另外,我在设置中做错了什么?
这是集群的吞吐量(在 Keyed Reduce -> Sink 阶段按字数计数)。
从这张图我们可以看出较慢的节点 - node2 的吞吐量远高于其他节点。这意味着 node2 从第一阶段收到了更多的记录。我认为这是因为主题中单词的 Zipf 分布。频率非常高的词映射到node2。
当节点在 Keyed Reduce -> Sink 阶段花费更多计算资源时,从 Kafka 读取数据的速度会降低。当node1、node3、node4对应的partition中的数据全部处理完后,集群的吞吐量下降。
【问题讨论】:
-
这种情况下,kafka topic的partition没有区别。原因是整个主题中的数据倾斜。集群有 4 个工作节点,每个工作节点有 4 个任务槽。似乎工作节点 2 中的所有插槽都接收到比其他节点中的插槽更多的数据。如果我们把node2的两个slot和node1交换一下呢?我的意思是即使我们无法平衡槽的工作量,但我们仍然可以平衡节点的工作量,因为每个节点都有多个任务槽。然后主题的所有分区将以相似的速度被消耗。