Kafka 中将 Topic 分为 partition,消费者从 partition 中消费消息。消息是怎么确定发住哪个 partition 呢?其实默认有两种分区选择策略:
- 消息 key 为空时随机选择
- 消息 key 不为空时,对 key 进行 HASH,然后对分区数取模
源码分析
在 KafkaProducer 的 doSend 方法中调用了以下方法进行分区选择,如果指定了分区,则直接使用指定的分区。如果没有指定则通过默认的 partitioner 来计算出分区。
1 2 3 4 5 6 7 8 |
// org.apache.kafka.clients.producer.KafkaProducer#partition
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
|
默认使用的分区处理类是 DefaultPartitioner,其内的实现如下:
1 2 3 4 5 6 7 8 9 10 |
// org.apache.kafka.clients.producer.internals.DefaultPartitioner#partition
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
|
如果提供了 key 值,则用 murmur2 的方法对 key 进行 HASH 并对 numPartitions(Topic 的分区数)取模。
当 Key 为空时,通过 stickyPartitionCache 的 partition 方法计算出分区。StickyPartitionCache 是 Kafka Client 内部的一个类,用于管理 Topic 的分区选择的逻辑和缓存。
1 2 3 4 5 6 7 8 |
// org.apache.kafka.clients.producer.internals.StickyPartitionCache#partition
public int partition(String topic, Cluster cluster) {
Integer part = indexCache.get(topic);
if (part == null) {
return nextPartition(topic, cluster, -1);
}
return part;
}
|
indexCache 是一个 ConcurrentHashMap 对象,对应的是 Topic -> Partition 的映射,如果该值不存在则调用 nextPartition 方法选择一个分区并缓存。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
// org.apache.kafka.clients.producer.internals.StickyPartitionCache#nextPartition
public int nextPartition(String topic, Cluster cluster, int prevPartition) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
Integer oldPart = indexCache.get(topic);
Integer newPart = oldPart;
// Check that the current sticky partition for the topic is either not set or that the partition that
// triggered the new batch matches the sticky partition that needs to be changed.
if (oldPart == null || oldPart == prevPartition) {
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() < 1) {
Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = random % partitions.size();
} else if (availablePartitions.size() == 1) {
newPart = availablePartitions.get(0).partition();
} else {
while (newPart == null || newPart.equals(oldPart)) {
Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = availablePartitions.get(random % availablePartitions.size()).partition();
}
}
// Only change the sticky partition if it is null or prevPartition matches the current sticky partition.
if (oldPart == null) {
indexCache.putIfAbsent(topic, newPart);
} else {
indexCache.replace(topic, prevPartition, newPart);
}
return indexCache.get(topic);
}
return indexCache.get(topic);
}
|
第一个分支条件 oldPart == null || oldPart == prevPartition 中:
-
oldPart == null表示没有分区缓存,对应着新增 topic 或第一次调用的情况 -
oldPart == prevPartition当创建了新的Batch时触发了此方法的情况,对分区缓存进行更新
满足以上条件之一就进入真正选择分区的逻辑。
关于 Batch:
- 每个
batch的数据属于同一个partitionSticker Partitioner是 Kafka 对空 key 的分区选择进行的优化,尽量在一个Batch中提交多几条数据。当Batch满或linger.ms时间到 才触发选择新的分区,在这之前,所有消息都会发到缓存的分区。原来的逻辑会每条消息都选择新的分区,可能造成很多batch太小,客户端请求过多,降低呑吐
后面的逻辑根据可用分区数进行处理,决定新的分区:
- 如果无可用分区,从所有分区里随机选择一个分区
- 只有一个可用分区,直接选用该分区
- 多个可用分区,随机取一个不等于当前分区的可用分区
结论
- 消息 key 为空时,如果有缓存分区,使用缓存分区,没有缓存则随机选择
- 消息 key 不为空时,对 key 进行 HASH,然后对分区数取模
参考:
kafka生产者分区优化
Apache Kafka Producer Improvements with the Sticky Partitioner