【问题标题】:In kafka, When producing message with transactional, Consumer offset doubled up在 kafka 中,当使用事务性生成消息时,消费者偏移量增加了一倍
【发布时间】:2019-05-17 17:07:35
【问题描述】:

我正在使用 springboot 2、kafk 2.2.0、spring-kafka 2.2.5 制作项目

我制作了kafka exactly once 环境,消息生产和消费都很好。

但是kafka-consumer-groups.sh 是这样说的。

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
test_topic      0          23              24              1   
test_topic      1          25              26              1   
test_topic      2          21              22              1   

我只向 kafka 发送了一条消息,但 LOG-END-OFFSET 加倍了,并且始终保持 1 延迟。 (在我的 java 应用程序中,生产和消费按预期工作)

我不知道为什么 LOG-END-OFFSET 加倍。

如果删除exactly once config,LOG-END-OFFSETCURRENT-OFFSET 计数没有问题。

这是我的kafkaTemplate 设置代码。

@Bean
    @Primary
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> producerProperties = new HashMap<>();
        producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092";
        producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        // exactly once producer setup
        producerProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

        DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerProperties, new StringSerializer(), new JsonSerializer<>(KafkaStaticOptions.OBJECT_MAPPER));
        factory.setTransactionIdPrefix("my.transaction.");
        return factory;
    }

    @Bean
    @Primary
    public KafkaTransactionManager<String, Object> kafkaTransactionManager(
        ProducerFactory<String, Object> producerFactory) {
        return new KafkaTransactionManager<>(producerFactory);
    }

    @Bean
    @Primary
    public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> producerFactory) {
        return new KafkaTemplate<>(producerFactory);
    }

我的生产者代码。

kafkaTemplate.executeInTransaction(kt -> kt.send("test_topic", "test data hahaha"));

我检查了 LOG-END-OFFSET 何时翻倍,是 produce transaction commit 计时。

我做错了什么配置?

【问题讨论】:

  • 这没什么意义。如果对数结束偏移加倍并且滞后保持为 1,则意味着当前偏移量也增加/加倍 + 1。您确定只发送 1 条消息吗?
  • 是的。当然,我只发送了 1 条消息,而我的消费者只消费了 1 条消息。
  • 并且当前的偏移量也增加了 > +1?
  • CURRENT-OFFSET 也翻了一番,除了消耗第一条消息。消费第一条消息时,CURRENT-OFFSET 加了 1,出现了 1 滞后。

标签: java apache-kafka spring-kafka


【解决方案1】:

在使用事务时,Kafka 在日志中插入“control batches”以指示消息是否是事务的一部分。

这些批次也被分配了偏移量,因此即使您只发送了一条记录,您也会看到偏移量增加了 2。

如果你想自己检查,你可以使用 DumpLogSegments 工具来显示你的日志内容并查看控制批次:

./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/mytopic-0/00000000000000000000.log
Dumping /tmp/kafka-logs/mytopic-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: 0 lastSequence: 0 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: false position: 0 CreateTime: 1558083247264 size: 10315 magic: 2 compresscodec: NONE crc: 3531536908 isvalid: true
baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: true position: 10315 CreateTime: 1558083247414 size: 78 magic: 2 compresscodec: NONE crc: 576574952 isvalid: true

我使用事务生产者发送了一条记录,您可以看到第二个条目有isControl: true

【讨论】:

  • 我无法确定,但很有可能。正如在使用事务时提到的,偏移量的含义有些不同
  • 您是说在使用事务时偏移量跳跃是正常的和预期的吗?
  • 是的,事务边界使用偏移量。来自中止事务的消息也保留它们的偏移量
猜你喜欢
  • 2021-03-14
  • 1970-01-01
  • 1970-01-01
  • 2018-06-14
  • 1970-01-01
  • 1970-01-01
  • 2018-02-08
  • 1970-01-01
  • 2020-08-28
相关资源
最近更新 更多