【发布时间】:2021-02-02 21:54:58
【问题描述】:
我的 Spring Boot 服务需要消耗一个主题的 kafka 事件,进行一些处理(包括使用 JPA 写入数据库),然后产生一些关于新主题的事件。无论发生什么情况,我都不能在不更新数据库的情况下发布事件,如果出现任何问题,我希望消费者的下一次轮询重试该事件。我的处理逻辑(包括数据库更新)是幂等的,所以重试就可以了
我认为我已经通过使用像这样的 ChainedKafkaTransactionManager 实现了 https://docs.spring.io/spring-kafka/reference/html/#exactly-once 中描述的恰好一次语义:
@Bean
public ChainedKafkaTransactionManager chainedTransactionManager(JpaTransactionManager jpa, KafkaTransactionManager<?, ?> kafka) {
kafka.setTransactionSynchronization(SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return new ChainedKafkaTransactionManager(kafka, jpa);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory,
ChainedKafkaTransactionManager chainedTransactionManager) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.getContainerProperties().setTransactionManager(chainedTransactionManager);
return factory;
}
我的 application.yaml 文件中相关的 kafka 配置如下所示:
kafka:
...
consumer:
group-id: myGroupId
auto-offset-reset: earliest
properties:
isolation.level: read_committed
...
producer:
transaction-id-prefix: ${random.uuid}
...
因为提交顺序对我的应用程序至关重要,我想编写一个集成测试来证明提交以所需的顺序发生,并且如果在提交到 kafka 期间发生错误,那么原始事件会再次被消耗。但是,我正在努力寻找一种导致 db 提交和 kafka 提交之间失败的好方法。
我有什么建议或替代方法吗?
谢谢
【问题讨论】:
标签: spring spring-boot transactions spring-kafka