【问题标题】:How can I test that I have configured ChainedKafkaTransactionManager correctly in my spring boot service如何测试我是否在我的 Spring Boot 服务中正确配置了 ChainedKafkaTransactionManager
【发布时间】:2021-02-02 21:54:58
【问题描述】:

我的 Spring Boot 服务需要消耗一个主题的 kafka 事件,进行一些处理(包括使用 JPA 写入数据库),然后产生一些关于新主题的事件。无论发生什么情况,我都不能在不更新数据库的情况下发布事件,如果出现任何问题,我希望消费者的下一次轮询重试该事件。我的处理逻辑(包括数据库更新)是幂等的,所以重试就可以了

认为我已经通过使用像这样的 ChainedKafkaTransactionManager 实现了 https://docs.spring.io/spring-kafka/reference/html/#exactly-once 中描述的恰好一次语义:

@Bean
public ChainedKafkaTransactionManager chainedTransactionManager(JpaTransactionManager jpa, KafkaTransactionManager<?, ?> kafka) {
    kafka.setTransactionSynchronization(SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
    return new ChainedKafkaTransactionManager(kafka, jpa); 
}

@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory,
        ChainedKafkaTransactionManager chainedTransactionManager) {

    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, kafkaConsumerFactory);
    factory.getContainerProperties().setTransactionManager(chainedTransactionManager);

    return factory;
}

我的 application.yaml 文件中相关的 kafka 配置如下所示:

  kafka:
    ...
    consumer:
      group-id: myGroupId
      auto-offset-reset: earliest
      properties:
        isolation.level: read_committed
      ...
    producer:
      transaction-id-prefix: ${random.uuid}
      ...

因为提交顺序对我的应用程序至关重要,我想编写一个集成测试来证明提交以所需的顺序发生,并且如果在提交到 kafka 期间发生错误,那么原始事件会再次被消耗。但是,我正在努力寻找一种导致 db 提交和 kafka 提交之间失败的好方法。

我有什么建议或替代方法吗?

谢谢

【问题讨论】:

    标签: spring spring-boot transactions spring-kafka


    【解决方案1】:

    您可以使用自定义ProducerFactory 来返回MockProducer(由kafka-clients 提供。

    设置commitTransactionException,以便在 KTM 尝试提交事务时抛出它。

    编辑

    这是一个例子;它不使用链式 TM,但这不应该有所作为。

    @SpringBootApplication
    public class So66018178Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So66018178Application.class, args);
        }
    
        @KafkaListener(id = "so66018178", topics = "so66018178")
        public void listen(String in) {
            System.out.println(in);
        }
    
    }
    
    spring.kafka.producer.transaction-id-prefix=tx-
    spring.kafka.consumer.auto-offset-reset=earliest
    
    @SpringBootTest(classes = { So66018178Application.class, So66018178ApplicationTests.Config.class })
    @EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers")
    class So66018178ApplicationTests {
    
        @Autowired
        EmbeddedKafkaBroker broker;
    
        @Test
        void kafkaCommitFails(@Autowired KafkaListenerEndpointRegistry registry, @Autowired Config config)
                throws InterruptedException {
    
            registry.getListenerContainer("so66018178").stop();
            AtomicReference<Exception> listenerException = new AtomicReference<>();
            CountDownLatch latch = new CountDownLatch(1);
            ((ConcurrentMessageListenerContainer<String, String>) registry.getListenerContainer("so66018178"))
                    .setAfterRollbackProcessor(new AfterRollbackProcessor<>() {
    
                        @Override
                        public void process(List<ConsumerRecord<String, String>> records, Consumer<String, String> consumer,
                                Exception exception, boolean recoverable) {
    
                            listenerException.set(exception);
                            latch.countDown();
                        }
                    });
            registry.getListenerContainer("so66018178").start();
    
            Map<String, Object> props = KafkaTestUtils.producerProps(this.broker);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(props);
            KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
            template.send("so66018178", "test");
            assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
            assertThat(listenerException.get()).isInstanceOf(ListenerExecutionFailedException.class)
                    .hasCause(config.exception);
        }
    
        @Configuration
        public static class Config {
    
            RuntimeException exception = new RuntimeException("test");
    
            @Bean
            public ProducerFactory<Object, Object> pf() {
                return new ProducerFactory<>() {
    
                    @Override
                    public Producer<Object, Object> createProducer() {
                        MockProducer<Object, Object> mockProducer = new MockProducer<>();
                        mockProducer.commitTransactionException = Config.this.exception;
                        return mockProducer;
                    }
    
                    @Override
                    public Producer<Object, Object> createProducer(String txIdPrefix) {
                        Producer<Object, Object> producer = createProducer();
                        producer.initTransactions();
                        return producer;
                    }
    
                    @Override
                    public boolean transactionCapable() {
                        return true;
                    }
    
                };
            }
    
        }
    
    }
    

    【讨论】:

    • 感谢@Gary 的回答,我一直在尝试按照您的建议连接一个 MockProducer,但我还不能让它工作。如果我不打电话给mockProducer.initTransactions() 我会得到一个错误:MockProducer hasn't been initialized for transactions. 如果我打电话那么我会得到Could not create Kafka transaction ... Transaction already started ... at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:1832) 知道我做错了什么吗?或者你知道我应该看看一个很好的例子吗?
    • 我添加了一个适合我的示例;当您调用initTransactions() 时,您可以编辑问题以显示完整的堆栈跟踪吗?
    • 使用你的例子,我设法让我的工作。我认为问题是由于我混淆了用于测试的生产工厂的接线和应用程序使用的接线。谢谢您的帮助! (以及我最近阅读的其他堆栈溢出问题中的所有其他答案)
    猜你喜欢
    • 2011-03-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2010-10-01
    • 2021-05-22
    • 1970-01-01
    • 2013-10-19
    • 1970-01-01
    相关资源
    最近更新 更多