【问题标题】:How Kafka Handles Keyed Message Related to PartitionKafka 如何处理与分区相关的键控消息
【发布时间】:2019-10-30 00:59:24
【问题描述】:
谁能解释一下:
- 实际上 Kafka 是如何存储键控消息的?分区是否只分配给一个键?我的意思是,分区是否可以存储具有多个键的消息?
- 如果第一个问题的答案是肯定的,那么如果 key 的数量超过可用的分区怎么办?
我的用例是,我正在考虑将大量船舶数据发送给经纪人并将其存储为 ship_id(如果您知道的话,MMSI)作为密钥。问题是,我不知道那时会收到多少船。所以我不能提前定义分区号。
【问题讨论】:
标签:
apache-kafka
data-partitioning
【解决方案1】:
一个分区是否可以存储具有多个键的消息?
是的,murmur2 哈希(Kafka 使用的算法),mod 一个主题中的分区数可以得到相同的数字。例如,如果你只有一个分区,那么任何 key 显然都会进入同一个分区
如果key的数量超过可用的分区怎么办?
哈希是取模的,所以它总是被分配一个有效的分区
现在,如果您有一个明确定义的键,则可以保证将消息排序到分区中,因此分区数量的答案实际上归结为单个分区可以处理多少吞吐量,并且没有简短的答案 -您发送了多少数据,一位消费者在“峰值”消费时从一个分区获取数据的速度有多快?进行适当的性能测试,然后在新主题上扩展分区数以处理潜在的未来负载
您还需要考虑“热”/“冷”数据。例如,如果您有 10 个分区映射到 ID 的第一个数字,那么您的所有数据都以偶数开头,那么您最终将有一半的分区为空
【解决方案2】:
1. Kafka 消息是键和值的形式,它存储在主题中。主题被划分为多个分区器,每个
分区进一步划分为段,每个段都有一个日志文件
以键值形式和索引或偏移量存储实际消息
消息。
Key 是可选的,用于标识要存储消息的分区,如果 key 为 null,则消息以循环方式存储,而如果 key 不为 null,则它将使用具有模块分区大小的 hash key,以保证选择其中之一分区。
例如
hash(key)%num_partition
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
因此,由于它的使用模块,它总是会在可用分区范围内存储消息,这就是多个键可能进入同一个分区的原因。消息键的主要好处是分桶相同的消息键应该去同一个分区。
2. 所以你不用担心可以根据key的数量来定义partition的数量。如上所述,密钥用于根据默认分区器逻辑将消息分桶到不同的分区。分区号基本上有助于将流程并行化到高吞吐量。
注意:您还要确保通过使用 key for partitioned data 可能会导致
分配不均,所以如果您不担心,只需保留键为空,在循环中选择分区
其他方法是创建自定义分区器以进一步细化分区选择逻辑。
here