【发布时间】:2018-04-16 15:18:51
【问题描述】:
我有一个消费者工作者应用程序,它在内部启动 X 线程数,每个线程都在生成它的 KafkaCosnumer。 Cosnumers 拥有相同的groupId 并订阅相同的主题。因此,每个消费者都能获得公平的分区份额。
处理的本质是我不能丢失消息,也不能允许重复。我运行的kafka版本是0.10.2.1。
这是我面临的问题:消费者线程 1 开始使用消息,并在 poll() 上获得一批消息。我还实现了ConsumerRebalanceListener,因此每次成功处理消息时,它都会被添加到offsets 映射中。 (见下面的代码。)因此,一旦发生重新平衡,我可以在我的分区重新分配给其他消费者之前提交我的偏移量。
有时,为了处理该批次,它需要比max.poll.interval.ms 更长的时间,这是重新平衡发生的地方,分区从消费者 1 中拉出并分配给消费者 2。消费者 1 不知道分区已被撤销并继续处理消息,与此同时,消费者 2 从最后一个偏移量(由 RebalanceListener 提交)拾取并处理相同的消息。
有没有办法通知消费者他已经撤销了分区,以便他可以停止处理循环中已经分配给其他消费者的消息?
public class RebalanceListener<K, V> implements ConsumerRebalanceListener {
private final KafkaConsumer<K, V> consumer;
private static final ConcurrentMap<TopicPartition, OffsetAndMetadata> CURRENT_OFFSETS =
Maps.newConcurrentMap();
private static final Logger LOGGER = LoggerFactory.getLogger(RebalanceListener.class);
public RebalanceListener(KafkaConsumer<K, V> consumer) {
this.consumer = consumer;
}
public void addOffset(String topic, int partition, long offset) {
LOGGER.debug("message=Adding offset to offsets map, topic={}, partition={}, offset={}",
topic, partition, offset);
CURRENT_OFFSETS.put(new TopicPartition(topic, partition),
new OffsetAndMetadata(offset, "commit"));
}
public Map<TopicPartition, OffsetAndMetadata> getCurrentOffsets() {
return CURRENT_OFFSETS;
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
LOGGER.debug("message=following partitions have been revoked from consumer: [{}]",
partitions.stream().map(
topicPartition -> topicPartition.topic() + ":" + topicPartition.partition())
.collect(joining(",")));
LOGGER.debug("message=Comitting offsets for partititions [{}]",
CURRENT_OFFSETS.keySet().stream().map(
topicPartition -> topicPartition.topic() + ":" + topicPartition.partition())
.collect(joining(",")));
consumer.commitSync(CURRENT_OFFSETS);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
LOGGER.debug("message=following partitions have been assigned to consumer: [{}]",
partitions.stream().map(
topicPartition -> topicPartition.topic() + ":" + topicPartition.partition())
.collect(joining(",")));
}
}
我想我可以在RebalanceListener 内部创建consumerId -- TopicPartition 的并发映射,然后在处理每条消息之前检查当前消费者是否仍与记录相关联(每个ConsumerRecord 都有topic 和partition 字段)。
如果没有 - 打破循环并创建下一个poll()。
如果我的工作应用程序将在一个实例中运行,这将是一个可行的解决方案,即使有多个 KafkaConsumer 线程在旋转。但是一旦我扩大规模,我将无法在静态地图中存储偏移量和消费者主题分区映射。这必须是某种集中式存储、数据库,或者说是 Redis。
但是,在我每次处理一个项目之前,我都必须询问我的记录是否可以被当前的消费者线程合法地处理。在扩展工作应用程序的情况下,这将是对外部存储的网络调用,这会破坏使用 kafka 的目的,因为它会减慢处理速度。我可能只是选择在处理单个项目后执行偏移提交。
【问题讨论】:
-
Kafka 1.0 于上周发布。 0.11 的特性之一应该是 once delivery
-
@StuartLC 我知道
exactly once语义是在 0.11 中引入的,不幸的是,我还不能升级到那个版本。 -
@StuartLC 所以,我阅读了您在上面链接的一篇文章。我不太明白 0.11 中的消费者如何处理我的情况。我消费的消息仍有可能需要超过
max.poll.interval.ms才能得到处理。 (我在处理完一整批味精后提交)。消息将处于进程循环中,直到所有消息都得到处理。在重新平衡发生之前的某个时间。即使发生了再平衡,我的旧消费者仍在继续处理。与此同时,其他消费者接管被撤销的分区。并且消息将被处理两次。
标签: java apache-kafka kafka-consumer-api