【问题标题】:Atomikos + Spring + JMS read messages batch transactionallyAtomikos + Spring + JMS 以事务方式批量读取消息
【发布时间】:2013-02-14 16:22:58
【问题描述】:

我正在尝试使用 spring + atomikos 设计一个应用程序,该应用程序读取一批 N 条消息并将它们保存到单个事务中的数据库中。它必须是一批消息,因为数据只有在批量时才是一致的,即单个消息不足以进行一致事务的数据。此外,每条消息都有一个事务绝对会影响我的表现。这不是典型的 JMS + DB 应用程序,因此我很难在网上找到示例(我尝试使用 atomikos 网站上建议的 MessageListener,但每条消息创建一个事务)。使用 Spring 实现这一目标的最佳方法是什么? 谢谢

乔瓦尼

【问题讨论】:

    标签: spring transactions jms jta atomikos


    【解决方案1】:

    我认为您的问题与 Spring 或 Atomikos 无关。这更像是一个设计问题。

    如果您需要在单个事务中提交所有消息,请将它们全部合并到一个更大的结构中,然后再将完整消息发送到您的持久层。

    【讨论】:

    • 从技术上讲,应该可以使用 Spring+Atomikos 执行以下步骤:启动事务,顺序从队列中读取 N 条消息,将数据持久化(一个或更多)数据库,然后提交事务。显然我希望 Spring 来管理事务:启动/提交/回滚。我知道如何使用 @Transactional 批注在多个数据库中批量保存数据,但我不知道 JMS 队列如何成为同一事务的一部分。这是技术挑战:它与设计无关。
    • 关于将批次合并到一条消息中,我想到了这一点。鉴于我不确定这对我来说是否可行,是否有开箱即用的解决方案让 JMS 将消息“打包”在一起?
    • “我不知道 JMS 队列如何成为同一事务的一部分。”要将数据库和 JMS 都作为同一事务的一部分,您需要 XA 协议,即两阶段提交 en.wikipedia.org/wiki/Two-phase_commit_protocol 另请参阅此链接,了解如何使用 XA 使用 JMS 配置 Atomikos:atomikos.com/Documentation/… 这是关于 Atomikos 和 XA 的文档使用 JDBC 的协议:atomikos.com/Documentation/ConfiguringJdbc
    【解决方案2】:

    弄清楚我需要做什么。在我的spring配置文件中,我需要配置如下:

    • org.springframework.transaction.jta.JtaTransactionManager 类的实例(例如配置为使用 atomikos),其中注入了“transactionManager”和“userTransaction”依赖项,例如,com.atomikos.icatch.jta.UserTransactionManagercom.atomikos.icatch.jta.UserTransactionImp 类的实例;
    • <tx:annotation-driven transaction-manager="txManager"> 标记用于在客户端代码中使用注释驱动的事务;
    • 向事务管理器注册 JMS 连接工厂,例如,通过创建 com.atomikos.jms.AtomikosConnectionFactoryBean 的实例;
    • 队列对象,例如com.tibco.tibjms.TibjmsQueue 或类似的实例;
    • 一个带有依赖“defaultDestination”的 JmsTemplate 注入了先前创建的队列;
    • 向事务管理器注册数据库,例如,通过创建com.atomikos.jdbc.AtomikosDataSourceBean 的实例(或com.atomikos.jdbc.nonxa.AtomikosNonXADataSourceBean 用于最后一个资源策略场景);
    • 一个或多个 SqlUpdate(或 BatchSqlUpdate 或 JdbcTemplate)实例(每个数据源一个),每个实例都具有注入先前定义的数据源的“dataSource”属性;
    • 一个自定义 DAO 类的实例,其中注入了先前定义的 JmsTemplate 和 SqlUpdate 对象:此类将具有一个或多个注解为 @Transactional 的方法,这些方法将使用 JmsTemplate 执行事务以接收消息,并使用 SqlUpdate(s) 来持久化它们在数据库中,例如,它可能是一个如下所示的类:

      public class MyDao{        
        JmsTemplate jmsTemplate; // getter/setter omitted for clarity
        BatchSqlUpdate sqlUpdate; // getter/setter omitted for clarity
      
        @Transactional
        public void persistMessages(int n){
          // map used for the sqlUpdate object
          Map<String, Object> params = new HashMap<>();
          // need to reset this since it's being reused
          sqlUpdate.reset();
          for(int i=0;i<n;i++){
            // retrieve a message synchronously
            Message msg = jmsTemplate.receive();
            // transform the message
            doSomeMagic(msg,params);
            // set parameters in the SQL
            sqlUpdate.updateByNamedParam(params);
          }
          // now the batch can be flushed
          sqlUpdate.flush();
        }
      
        private void doSomeMagic(Message msg, Map<String,Object> params){
          // implementation is application-dependent!
          // the only assumption is that somehow the
          // message can be used to set the named
          // parameters in the sqlUpdate object
        }
      }
      

    唯一值得注意的是DAO必须由spring管理,因为使用注解需要spring来创建代理。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-06-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-03-14
      • 1970-01-01
      • 2011-09-07
      • 2012-03-18
      相关资源
      最近更新 更多