【问题标题】:Why I lost messages when using kafka with EOS Beta为什么我在使用带有 EOS Beta 的 kafka 时丢失了消息
【发布时间】:2020-11-03 15:24:53
【问题描述】:

我正在使用 spring-kafka 2.5.5.RELEASE 并且在使用带有 ExactlyOnceSemantic 的 kafka 时丢失了消息。

配置

我的听众正在使用 @KafkaListener 这个 containerConfiguration :

@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaBatchListenerContainerFactory(
                ConsumerFactory<Object, Object> kafkaConsumerFactory,
                AfterRollbackProcessor<Object, Object> afterRollbackProcessor,
                KafkaTransactionManager<Object, Object> kafkaTransactionManager,
                ConsumerRecordRecoverer myRecoverer
        ) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(kafkaConsumerFactory);
    factory.setAfterRollbackProcessor(afterRollbackProcessor);
    factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);
    factory.getContainerProperties().setEosMode(ContainerProperties.EOSMode.BETA);
    factory.setBatchListener(true);
    factory.setBatchToRecordAdapter(new DefaultBatchToRecordAdapter<>(myRecoverer));
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
    return factory;
}
@Bean
public AfterRollbackProcessor<?, ?> afterRollbackProcessor(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
    return new DefaultAfterRollbackProcessor<>(deadLetterPublishingRecoverer);
}

属性:

spring.kafka.producer.acks = all
spring.kafka.consumer.isolation-level = read_committed
spring.kafka.consumer.group-id = ap_u2
# Use hostname to generate unique transaction id per instance
spring.kafka.producer.transaction-id-prefix = tx_ap_u2_${HOSTNAME}_

主题有 3 个分区,我的应用程序有 3 个实例,每个实例占用 1 个分区。

问题

我在读-写-写案例中丢失了一些消息。 输出主题没有收到处理后的消息。

DLT 为空,侦听器继续处理消息。

消息丢失与错误日志相关联:

Nov 2, 2020 @ 16:04:23.696
org.springframework.kafka.core.DefaultKafkaProducerFactory
commitTransaction failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@2a8419e]
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Nov 2, 2020 @ 16:04:23.788
org.springframework.kafka.core.DefaultKafkaProducerFactory
Error during some operation; producer removed from cache: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@2a8419e]

Nov 2, 2020 @ 16:04:23.979
org.springframework.kafka.support.LoggingProducerListener
Exception thrown when sending a message with key='XXX' and payload='XXX' to topic XXX:
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Nov 2, 2020 @ 16:04:23.981
org.apache.kafka.clients.producer.internals.Sender
[Producer clientId=producer-XXXXXX.0, transactionalId=XXXXXX.0] Aborting producer batches due to fatal error
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

26 times on different messages:
Nov 2, 2020 @ 16:04:23.985 to Nov 2, 2020 @ 16:04:24.787
org.springframework.kafka.support.LoggingProducerListener
Exception thrown when sending a message with key='XXX' and payload='XXX' to topic XXX:
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

3 times this log :
Nov 2, 2020 @ 16:04:24.893
org.apache.kafka.clients.NetworkClient
[Producer clientId=producer-XXXXXX.0, transactionalId=XXXXXX.0] Uncaught error in request completion:
java.lang.IllegalStateException: Should not reopen a batch which is already aborted.
    at org.apache.kafka.common.record.MemoryRecordsBuilder.reopenAndRewriteProducerState(MemoryRecordsBuilder.java:295)

6 times:
Nov 2, 2020 @ 16:04:24.890 to Nov 2, 2020 @ 16:04:24.887
org.springframework.kafka.support.LoggingProducerListener
Exception thrown when sending a message with key='XXX' and payload='XXX' to topic XXX:
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Nov 2, 2020 @ 16:04:25.085
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerCons
Producer or 'group.instance.id' fenced during transaction
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

问题

为什么使用具有 EOS.Beta 语义的读-处理-写事务侦听器会丢失 244 条消息? 没有任何进一步的信息,交易就被屏蔽了。

谢谢

【问题讨论】:

    标签: apache-kafka spring-kafka


    【解决方案1】:

    生产者或“group.instance.id”在交易期间被隔离

    使用ProducerFencedException,不会调用回滚处理器,因为这意味着消费者已经丢失了这些分区。在该错误之后,您应该会看到重新平衡活动。由于偏移量未发送到事务,因此应将相同的记录重新发送到此实例或另一个实例。

    如果您在该错误之后没有看到重新平衡,那么确实发生了一些非常奇怪的事情。

    编辑

    我刚刚编写了一个小型测试应用程序,它完全按预期运行 - 运行 2 个副本时;我遇到了和你一样的错误,而且在同一条记录上一直失败。

    public class So64665725Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So64665725Application.class, args);
        }
    
        @Autowired
        KafkaTemplate<String, String> template;
    
        @KafkaListener(id = "so64665725", topics = "so64665725-1")
        public void listen(String in) throws Exception {
            System.out.println(in);
            Thread.sleep(15000);
            try {
                System.out.println(this.template.send("so64665725-2", in.toUpperCase()).get(10, TimeUnit.SECONDS)
                        .getRecordMetadata());
            }
            catch (ExecutionException e) {
                e.getCause().printStackTrace();
                throw (Exception) e.getCause();
            }
            catch (TimeoutException e) {
                throw e;
            }
        }
    
        @Bean
        public NewTopic topic1() {
            return TopicBuilder.name("so64665725-1").partitions(1).replicas(1).build();
        }
    
        @Bean
        public NewTopic topic2() {
            return TopicBuilder.name("so64665725-2").partitions(1).replicas(1).build();
        }
    
    }
    
    spring.kafka.consumer.properties.max.poll.interval.ms=10000
    spring.kafka.consumer.auto-offset-reset=earliest
    
    spring.kafka.producer.acks=all
    spring.kafka.producer.transaction-id-prefix=tx-
    

    这里使用的是 Boot 2.4.0-RC1 和 SK 2.6.2,默认有 EOSMode.BETA。

    【讨论】:

    • 我没有看到任何其他日志(我处于 WARN/ERROR 级别)。该应用程序仍在使用可用的普罗米修斯指标运行,但我不知道是否有任何指标
    • 你是否产生同样的错误?我仍然不明白为什么我的消息在没有任何进一步信息的情况下丢失了,除了事务已过期并且消息被跳过而不是回滚
    • 是的;我得到了和你一样的错误,没有记录丢失。
    • 关于起源的任何线索? :s 看起来批处理失败了,afterRollback 没有被调用,因此偏移量没有提交。但是侦听器继续下一批并提交下一批这样导致未提交的批次丢失?
    • 如我所说;一定是发生了什么奇怪的事情;在ProducerFencedException 之后,我们无法重新搜索分区(在 DARP 中);我们不再有那些分区。如果您可以通过一个小而完整的示例可靠地重现此场景;我可以看看有什么问题。
    【解决方案2】:

    kafka 中的已知错误:KAFKA-9803

    在 spring-kafka 2.5.8.RELEASE 和 2.6.3.RELEASE 中实现的解决方法

    查看 stopContainerWhenFenced 选项

    【讨论】:

      猜你喜欢
      • 2023-03-14
      • 1970-01-01
      • 2016-06-24
      • 2019-08-10
      • 1970-01-01
      • 2019-08-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多