【问题标题】:Spring Integration transaction for subflow of router路由器子流的Spring集成事务
【发布时间】:2021-01-05 11:45:44
【问题描述】:

我有一个名为“creationChannel”的频道,它使用 MongoMessageStore 进行备份,如下所示:

  @Bean
  ChannelMessageStore messageStore() {
    return new MongoDbChannelMessageStore(mongoDatabaseFactory);
  }

  @Bean
  PollableChannel creationChannel(ChannelMessageStore messageStore) {
    return MessageChannels.queue("creationChannel", messageStore, "create").get();
  }

我想在我的流程中使用它,但我想确定,如果“createOrderHandler”工作正常,来自那里的消息将是只读的(同样适用于“updateOrderHandler”,但使用不同的通道)。

...some code here...
        .<HeadersElement, OperationType>route(
            payload -> route(payload),
            spec -> spec
                .transactional(transactionHandleMessageAdvice)
                .subFlowMapping(
                    OperationType.New,
                    sf -> sf
                        .channel("creationChannel")
                        .log(Level.DEBUG, "Creation of a new order", Message::getPayload)
                        .transform(Mapper::mapCreate)
                        .handle(createOrderHandler,
                            handlerSpec -> handlerSpec.advice(retryOperationsInterceptor))
                )
                .subFlowMapping(
                    OperationType.Update,
                    sf -> sf
                        .channel("updateChannel")
                        .log(Level.DEBUG, "Update for existing order", Message::getPayload)
                        .transform(Mapper::mapUpdate)
                        .handle(updateOrderHandler,
                            handlerSpec -> handlerSpec.advice(retryOperationsInterceptor))
                )
        )
...some code here...

我尝试像这样配置“transactionHandleMessageAdvice”:

  @Bean
  TransactionHandleMessageAdvice transactionHandleMessageAdvice(MongoTransactionManager transactionManager) {
    return new TransactionHandleMessageAdvice(transactionManager, new Properties());
  }

但在处理程序因异常而失败后,消息仍在从数据库中删除。

也许我应该为子流配置 Poller 并以某种方式使用 MongoTransactionManager 配置它?

【问题讨论】:

    标签: java spring spring-integration


    【解决方案1】:

    也许我应该为子流配置 Poller 并以某种方式使用 MongoTransactionManager 配置它?

    这是正确的假设。只要您有线程在流中移动(例如您的PollableChannel creationChannel),当前事务就会在消息放入存储时提交。当前线程中没有任何事情发生,因此,您使用该 .transactional(transactionHandleMessageAdvice) 开始的当前事务。

    要使读取事务化,您确实必须在.transform(Mapper::mapCreate) 端点上配置Poller。因此,来自该队列通道的每个轮询都将是事务性的,直到您再次转移到不同的线程。

    没有办法(而且绝对不能)让整个异步流事务化,因为事务与ThreadLocal 绑定,并且在调用堆栈返回到事务发起者的那一刻,它被提交或回滚.使用异步逻辑,我们只是打算从生产者端“发送并忘记”,让消费者在数据准备好时处理数据。这不是交易的设计目的。

    【讨论】:

    • 感谢您的回答,所以您的意思是 .channel("creationChannel") 之后的所有内容都将在单独的线程中执行,配置转换的正确方法是什么?像这样: .transform(Mapper::mapCreate, conf -> conf .transactional(transactionHandleMessageAdvice)) 并从路由器规范中删除 transactionHandleMessageAdvice?如果日志端点停留在通道和转换之间,它会成为问题吗?
    • log() 运算符不是端点 - 它是拦截器docs.spring.io/spring-integration/reference/html/…。您需要将事务添加到该 transform() 端点的 PollerSpec 中。现在,我相信您在某个地方有一个全局轮询器定义,可以让您的流程正常工作。但是,如果您希望轮询事务性,则需要更具体地确定轮询消息所通过的端点。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2014-04-04
    • 2018-06-15
    • 2018-09-28
    • 1970-01-01
    • 1970-01-01
    • 2021-11-23
    • 2019-07-05
    相关资源
    最近更新 更多