【问题标题】:How to restrict transaction boundaries for JdbcPollingChannelAdapter如何限制 JdbcPollingChannelAdapter 的事务边界
【发布时间】:2018-12-18 22:22:39
【问题描述】:

我有一个 JdbcPollingChannelAdapter 定义如下:

@Bean
public MessageSource<Object> jdbcMessageSource(DataSource dataSource) {
    JdbcPollingChannelAdapter jdbcPollingChannelAdapter = new JdbcPollingChannelAdapter(dataSource,
            "SELECT * FROM common_task where due_at <= NOW() and retries < order by due_at ASC FOR UPDATE SKIP LOCKED");
    jdbcPollingChannelAdapter.setMaxRowsPerPoll(1);
    jdbcPollingChannelAdapter.setUpdateSql("Update common_task set retries = :retries, due_at = due_at + interval '10 minutes' WHERE ID = (:id)");
    jdbcPollingChannelAdapter.setUpdatePerRow(true);
    jdbcPollingChannelAdapter.setRowMapper(this::mapRow);
    jdbcPollingChannelAdapter.setUpdateSqlParameterSourceFactory(this::updateParamSource);
    return jdbcPollingChannelAdapter;
}

为此的集成流程:

@Bean
public IntegrationFlow pollingFlow(MessageSource<Object> jdbcMessageSource) {
    return IntegrationFlows.from(jdbcMessageSource,
            c -> c.poller(Pollers.fixedRate(250, TimeUnit.MILLISECONDS)
                    .maxMessagesPerPoll(1)
                    .transactional()))
            .split()
            .channel(taskSourceChannel())
            .get();
}

服务激活器定义为

@ServiceActivator(inputChannel = "taskSourceChannel")
    public void doSomething(FooTask event) {
        //do something but ** not ** within the transaction of the poller.
    }      

集成流中的轮询器被定义为事务性的。根据我的理解,这将 1、在事务中执行选择查询和更新查询。 2. 它也会在同一个事务中执行doSomething()方法。

目标:我想做 1 而不是 2。我想在事务中进行选择和更新,以确保两者都发生。但是,我不想在同一个事务中执行 doSomething()。如果 doSomething() 中出现异常,我仍然希望保留在轮询期间所做的更新。我怎样才能做到这一点?

【问题讨论】:

    标签: spring-integration spring-integration-dsl


    【解决方案1】:

    这是通过简单的线程转移来完成的。所以,你只需要离开轮询线程,让它提交 TX 并在单独的线程中继续处理。

    根据您对.split() 的逻辑,最好在拆分后已经有新的线程处理,因此项目将由doSomething() 并行处理。

    只需使用ExecutorChannel 即可实现目标。由于您已经拥有该taskSourceChannel(),因此只需考虑基于一些托管ThreadPoolTaskExecutor 将其替换为ExecutorChannel

    在参考手册中查看更多信息:https://docs.spring.io/spring-integration/reference/html/messaging-channels-section.html#channel-configuration-executorchannel

    及其 Javadocs。

    简单的 Java Configuration 变体是这样的:

        @Bean
        public MessageChannel taskSourceChannel() {
            return new ExecutorChannel(executor());
        }
    
        @Bean
        public Executor executor() {
            return new ThreadPoolTaskExecutor();
        }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2020-07-11
      • 1970-01-01
      • 2015-08-18
      • 2019-04-18
      • 2017-03-26
      • 2017-05-22
      • 2016-06-05
      相关资源
      最近更新 更多