【发布时间】:2019-05-17 17:07:35
【问题描述】:
我正在使用 springboot 2、kafk 2.2.0、spring-kafka 2.2.5 制作项目
我制作了kafka exactly once 环境,消息生产和消费都很好。
但是kafka-consumer-groups.sh 是这样说的。
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
test_topic 0 23 24 1
test_topic 1 25 26 1
test_topic 2 21 22 1
我只向 kafka 发送了一条消息,但 LOG-END-OFFSET 加倍了,并且始终保持 1 延迟。 (在我的 java 应用程序中,生产和消费按预期工作)
我不知道为什么 LOG-END-OFFSET 加倍。
如果删除exactly once config,LOG-END-OFFSET 和CURRENT-OFFSET 计数没有问题。
这是我的kafkaTemplate 设置代码。
@Bean
@Primary
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> producerProperties = new HashMap<>();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092";
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
// exactly once producer setup
producerProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerProperties, new StringSerializer(), new JsonSerializer<>(KafkaStaticOptions.OBJECT_MAPPER));
factory.setTransactionIdPrefix("my.transaction.");
return factory;
}
@Bean
@Primary
public KafkaTransactionManager<String, Object> kafkaTransactionManager(
ProducerFactory<String, Object> producerFactory) {
return new KafkaTransactionManager<>(producerFactory);
}
@Bean
@Primary
public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
我的生产者代码。
kafkaTemplate.executeInTransaction(kt -> kt.send("test_topic", "test data hahaha"));
我检查了 LOG-END-OFFSET 何时翻倍,是 produce transaction commit 计时。
我做错了什么配置?
【问题讨论】:
-
这没什么意义。如果对数结束偏移加倍并且滞后保持为 1,则意味着当前偏移量也增加/加倍 + 1。您确定只发送 1 条消息吗?
-
是的。当然,我只发送了 1 条消息,而我的消费者只消费了 1 条消息。
-
并且当前的偏移量也增加了 > +1?
-
CURRENT-OFFSET 也翻了一番,除了消耗第一条消息。消费第一条消息时,CURRENT-OFFSET 加了 1,出现了 1 滞后。
标签: java apache-kafka spring-kafka