【问题标题】:Transaction Synchronization in spring boot with Database+ kafka exampleSpring Boot 中的事务同步与 Database+ kafka 示例
【发布时间】:2019-10-03 20:56:01
【问题描述】:

我想用 Spring Boot 编写一个新的应用程序,使用 MySQL + Mango 数据库和 Spring Kafka 消息传递。

我尝试使用 Many POC 来同步 Kafka 和 DB 之间的事务,但在某些情况下失败了,而且我搜索了许多存储库、博客以获取至少一个示例。我现在还没有得到任何例子。

如果有人给出至少一个示例或配置,那将是未来所有人的一个很好的参考。

【问题讨论】:

    标签: spring-boot spring-data-jpa spring-kafka


    【解决方案1】:

    给你...

    @SpringBootApplication
    public class So56170932Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So56170932Application.class, args);
        }
    
        @Bean
        public ApplicationRunner runner(KafkaTemplate<String, String> template) {
            return args -> template.executeInTransaction(t -> t.send("so56170932a", "foo"));
        }
    
        @Bean
        public ChainedKafkaTransactionManager<Object, Object> chainedTm(KafkaTransactionManager<String, String> ktm,
                DataSourceTransactionManager dstm) {
    
            return new ChainedKafkaTransactionManager<>(ktm, dstm);
        }
    
        @Bean
        public DataSourceTransactionManager dstm(DataSource dataSource) {
            return new DataSourceTransactionManager(dataSource);
        }
    
        @Bean
        public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
                ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
                ConsumerFactory<Object, Object> kafkaConsumerFactory,
                ChainedKafkaTransactionManager<Object, Object> ctm) {
    
            ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
            configurer.configure(factory, kafkaConsumerFactory);
            factory.getContainerProperties().setTransactionManager(ctm);
            return factory;
        }
    
        @Component
        public static class Listener {
    
            private final JdbcTemplate jdbcTemplate;
    
            private final KafkaTemplate<String, String> kafkaTemplate;
    
            public Listener(JdbcTemplate jdbcTemplate, KafkaTemplate<String, String> kafkaTemplate) {
                this.jdbcTemplate = jdbcTemplate;
                this.kafkaTemplate = kafkaTemplate;
            }
    
            @KafkaListener(id = "so56170932a", topics = "so56170932a")
            public void listen1(String in) {
                this.kafkaTemplate.send("so56170932b", in.toUpperCase());
                this.jdbcTemplate.execute("insert into so56170932 (data) values ('" + in + "')");
            }
    
            @KafkaListener(id = "so56170932b", topics = "so56170932b")
            public void listen2(String in) {
                System.out.println(in);
            }
    
        }
    
        @Bean
        public NewTopic topicA() {
            return TopicBuilder.name("so56170932a").build();
        }
    
        @Bean
        public NewTopic topicB() {
            return TopicBuilder.name("so56170932b").build();
        }
    
    }
    

    spring.datasource.url=jdbc:mysql://localhost/integration?serverTimezone=UTC
    spring.datasource.username=root
    spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
    
    spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.consumer.enable-auto-commit=false
    spring.kafka.consumer.properties.isolation.level=read_committed
    
    spring.kafka.producer.transaction-id-prefix=tx-
    
    logging.level.org.springframework.transaction=trace
    logging.level.org.springframework.kafka.transaction=debug
    logging.level.org.springframework.jdbc=debug
    

    mysql> select * from so56170932;
    +------+
    | data |
    +------+
    | foo  |
    | foo  |
    | foo  |
    | foo  |
    | foo  |
    | foo  |
    | foo  |
    | foo  |
    | foo  |
    +------+
    9 rows in set (0.00 sec)
    

    【讨论】:

    • 感谢@GaryRussell 举例。我有 2 个问题。
    • 1.我在 docker 中为此应用程序创建多个实例,因此每个实例的事务前缀 id 应该相同还是不同? 2. 我要同步4个事务(DataSourceTransactionManager,PlatformTransactionManager - 默认为实体管理器,JpaTransactionManager 和 KafkaTransactionManager),可以吗?
    • 1.为了正确处理僵尸围栏,它们应该是相同的;实际的事务 id 由前缀、消费者组、主题和分区组成。然后,如果一个实例出现故障,Kafka 可以正确处理重新平衡后将一个分区移动到另一个实例。 2. 是的;你必须决定顺序;通常,您首先需要 Kafka(因此它的提交是最后一个),以避免在失败后丢失消息;您的代码将需要处理已提交到一个或多个数据库的重复交付的可能性。
    • 我在 docker 中有一个 Kafka 代理,如果我使用相同的 transaction-id-prefix 连接了多个 spring -boot 实例,那么它偶尔会给出producedFencedException,并且事务也将不起作用。但如果我给出不同的事务 ID 前缀,那么它的工作正常。为什么会这样?有什么我想念的吗!
    • 如果您在侦听器容器线程上生成消息,则transactional.id&lt;prefix&gt;&lt;group&gt;.&lt;topic&gt;.&lt;partition&gt;。由于一个分区不能分配给多个实例,transactional.ids 将是唯一的。如果您在容器线程的上下文之外生成消息,则 transactional.id(因此前缀)在实例之间必须是唯一的。如果您两者都做,您将需要 2 个不同的生产者工厂。如果这不能回答您的问题,请提出一个显示您的代码和配置的新问题。
    猜你喜欢
    • 2018-05-01
    • 2020-03-07
    • 2018-08-03
    • 2019-03-16
    • 1970-01-01
    • 1970-01-01
    • 2019-10-05
    • 1970-01-01
    • 2020-11-13
    相关资源
    最近更新 更多