【问题标题】:Spring Kafka transaction causes producer per message offset increased by twoSpring Kafka 事务导致生产者每条消息偏移量增加 2
【发布时间】:2019-12-03 08:29:48
【问题描述】:

我在使用 Spring(boot) Kafka 的微服务中有一个消费-转换-生产工作流程。我需要实现 Kafka 事务提供的一次性语义。 这是下面的代码sn-p:

配置

@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
    props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024);
    DefaultKafkaProducerFactory<String, String> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(props);
    defaultKafkaProducerFactory.setTransactionIdPrefix("kafka-trx-");
    return defaultKafkaProducerFactory;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5000);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

@Bean
public KafkaTransactionManager<String, String> kafkaTransactionManager() {
    return new KafkaTransactionManager<>(producerFactory());
}

@Bean
@Qualifier("chainedKafkaTransactionManager")
public ChainedKafkaTransactionManager<String, Object> chainedKafkaTransactionManager(KafkaTransactionManager<String, String> kafkaTransactionManager) {
    return new ChainedKafkaTransactionManager<>(kafkaTransactionManager);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> concurrentKafkaListenerContainerFactory(ChainedKafkaTransactionManager<String, Object> chainedKafkaTransactionManager) {
    ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
    concurrentKafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
    concurrentKafkaListenerContainerFactory.setBatchListener(true);
    concurrentKafkaListenerContainerFactory.setConcurrency(nexusConsumerConcurrency);
    //concurrentKafkaListenerContainerFactory.setReplyTemplate(kafkaTemplate());
    concurrentKafkaListenerContainerFactory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.BATCH);
    concurrentKafkaListenerContainerFactory.getContainerProperties().setTransactionManager(chainedKafkaTransactionManager);
    return concurrentKafkaListenerContainerFactory;
}

听众

@KafkaListener(topics = "${kafka.xxx.consumerTopic}", groupId = "${kafka.xxx.consumerGroup}", containerFactory = "concurrentKafkaListenerContainerFactory")
public void listen(@Payload List<String> msgs, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, @Header(KafkaHeaders.OFFSET) List<Integer> offsets) {

    int i = -1;
    for (String msg : msgs) {
        ++i;
        LOGGER.debug("partition={}; offset={}; msg={}", partitions.get(i), offsets.get(i), msg);
        String json = transform(msg);
        kafkaTemplate.executeInTransaction(kt -> kt.send(producerTopic, json));
    }
}

但是在产品环境中,我遇到了一个奇怪的问题。生产者发送的每条消息,偏移量增加 2,消费者不提交消费偏移量。

主题 1 的消费者偏移量

Topic1 消费者详情

制作主题2

但是,生产者发送的消息数与消费者发送的消息数相同。生产者的下游可以连续接收来自topic2的消息。日志中没有发现错误或异常。

我想知道为什么 consumer-transform-produce 工作流程看起来没问题(也可以保证完全一次 scemantics),但是没有提交消耗的偏移量,并且每个 msg 生成的 msg 偏移增量是 2 而不是 1。

如何解决?谢谢!

【问题讨论】:

    标签: java apache-kafka spring-kafka


    【解决方案1】:

    这就是它的设计方式。 Kafka 日志是不可变的,因此在事务结束时使用一个额外的“槽”来指示事务是提交还是回滚。这允许具有read_committed 隔离级别的消费者跳过回滚事务。

    如果您在一个事务中发布 10 条记录,您将看到偏移量增加 11。如果您只发布一条,它将增加 2。

    如果您希望发布参与消费者启动的事务(仅一次),则不应使用executeInTransaction;这将开始一个新的交易。

    /**
     * Execute some arbitrary operation(s) on the operations and return the result.
     * The operations are invoked within a local transaction and do not participate
     * in a global transaction (if present).
     * @param callback the callback.
     * @param <T> the result type.
     * @return the result.
     * @since 1.1
     */
    <T> T executeInTransaction(OperationsCallback<K, V, T> callback);
    

    我不明白为什么消费者偏移量仍不会发送到消费者启动的事务。您应该打开 DEBUG 日志记录以查看发生了什么(如果在您修复模板代码后仍然发生)。

    编辑

    监听器退出时,监听器容器将消耗的偏移量(+1)发送给事务;打开提交日志,你会看到它...

    @SpringBootApplication
    public class So59152915Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So59152915Application.class, args);
        }
    
        @Autowired
        private KafkaTemplate<String, String> template;
    
        @KafkaListener(id = "foo", topics = "so59152915-1", clientIdPrefix = "so59152915")
        public void listen1(String in, @Header(KafkaHeaders.OFFSET) long offset) throws InterruptedException {
            System.out.println(in + "@" + offset);
            this.template.send("so59152915-2", in.toUpperCase());
            Thread.sleep(2000);
        }
    
        @KafkaListener(id = "bar", topics = "so59152915-2")
        public void listen2(String in) {
            System.out.println(in);
        }
    
        @Bean
        public NewTopic topic1() {
            return new NewTopic("so59152915-1", 1, (short) 1);
        }
    
        @Bean
        public NewTopic topic2() {
            return new NewTopic("so59152915-2", 1, (short) 1);
        }
    
        @Bean
        public ApplicationRunner runner(KafkaListenerEndpointRegistry registry) {
            return args -> {
                this.template.executeInTransaction(t -> {
                    IntStream.range(0, 11).forEach(i -> t.send("so59152915-1", "foo" + i));
                    try {
                        System.out.println("Hit enter to commit sends");
                        System.in.read();
                    }
                    catch (IOException e) {
                        e.printStackTrace();
                    }
                    return null;
                });
            };
        }
    
    }
    
    @Component
    class Configurer {
    
        Configurer(ConcurrentKafkaListenerContainerFactory<?, ?> factory) {
            factory.getContainerProperties().setCommitLogLevel(Level.INFO);
        }
    
    }
    

    spring.kafka.producer.transaction-id-prefix=tx-
    spring.kafka.consumer.properties.isolation.level=read_committed
    spring.kafka.consumer.auto-offset-reset=earliest
    

    foo0@56
    2019-12-04 10:07:18.551  INFO 55430 --- [      foo-0-C-1] essageListenerContainer$ListenerConsumer : Sending offsets to transaction: {so59152915-1-0=OffsetAndMetadata{offset=57, leaderEpoch=null, metadata=''}}
    foo1@57
    FOO0
    2019-12-04 10:07:18.558  INFO 55430 --- [      bar-0-C-1] essageListenerContainer$ListenerConsumer : Sending offsets to transaction: {so59152915-2-0=OffsetAndMetadata{offset=63, leaderEpoch=null, metadata=''}}
    2019-12-04 10:07:20.562  INFO 55430 --- [      foo-0-C-1] essageListenerContainer$ListenerConsumer : Sending offsets to transaction: {so59152915-1-0=OffsetAndMetadata{offset=58, leaderEpoch=null, metadata=''}}
    foo2@58
    

    【讨论】:

    • 你不应该使用executeInTransaction
    • 使用 kafkaTemplate.send() 仍然不会提交消耗的偏移量。如果不是 send() 我应该调用哪个方法?
    • 监听器退出时,监听器容器向事务发送消耗的偏移量(+1);打开提交日志,你会看到它。请参阅我的答案的编辑。
    • 非常感谢。我按照您所说的设置了提交日志级别。 INFO 日志显示发送到事务的消耗的偏移量是正确的。由于 kafka 事务的某些机制,kafka monitor web ui 可能无法正确显示消费者偏移量。我不知道为什么。
    【解决方案2】:

    请注意您的自动提交设置。如我所见,您将其设置为 false:

    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    

    因此,在这种情况下,您需要“手动”提交或将自动提交设置为 true。

    【讨论】:

    • Kafka事务会在事务处理中提交偏移量。我不应该自己提交偏移或切换自动提交。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-01-15
    • 2016-03-28
    • 2019-06-14
    • 2018-02-08
    • 2021-03-14
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多