【问题标题】:JTA Transactions Rollback of Routes using Directs?JTA 事务使用 Directs 回滚路由?
【发布时间】:2014-04-15 01:58:46
【问题描述】:

我有一个有趣的用例,我想知道是否有人对此有想法。

在数据库中基本上有一个通知表,它基本上具有以下结构:

DROP TABLE IF EXISTS "notifications";
CREATE TABLE "notifications" (
  "process_id" INT(11) NOT NULL DEFAULT '0',
  "table_name" VARCHAR(40) NOT NULL,
  "last_updated" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ,
  "last_processed" TIMESTAMP NOT NULL,
  PRIMARY KEY ("process_id")
);

INSERT INTO notifications (process_id, table_name, last_updated, last_processed) VALUES
(1, 'CASES_A', '2013-10-23 08:01:15+00:00','2013-10-22 08:30:22+00:00'),
(2, 'CASES_B', '2013-10-23 08:05:15+00:00','2013-10-22 08:05:15+00:00');

DROP TABLE IF EXISTS "CASES_A";
CREATE TABLE "CASES_A" (
  "case_id" VARCHAR(25),
  "case_date" TIMESTAMP NOT NULL,
  PRIMARY KEY ("case_id")
);

INSERT into CASES_A (case_id, case_date) VALUES
('5000NQLj451NJ1', '2013-10-22 18:33:25+00:00'),
('5000NQLj4992F1', '2013-11-05 17:19:02+00:00'),
('5000NQLj8N9J11', '2013-11-06 08:03:08+00:00');

DROP TABLE IF EXISTS "CASES_B";
CREATE TABLE "CASES_B" (
  "case_id" VARCHAR(25),
  "case_date" TIMESTAMP NOT NULL,
  PRIMARY KEY ("case_id")
);

INSERT into CASES_B (case_id, case_date) VALUES
('5000NQLk451NX4', '2013-10-21 10:23:26+00:00'),
('5000NQLk451NX5', '2013-10-20 11:10:25+00:00');

我们的想法是我们调用通知表的检查(在生产中使用石英完成),然后当有一条记录的最后更新日期大于最后处理日期时,我们读取 table_name 中的数据并处理记录,将它们路由到适当的队列,当我们路由所有记录时,我们更新 last_processed 日期。

用例有以下目标:

  1. 如果 table_name 中的单个记录处理失败,它会被发送到死信队列但不会回滚。
  2. 如果一条记录被发送到路由队列,但在子路由完成之前抛出异常,则应该从路由队列中删除该记录并发送到死亡。
  3. 如果上次处理时间的更新出现错误,所有已处理的记录都会从它们路由到的队列中删除。
  4. 如果 table_name 中的任何记录无法直接获取,则该事件中的所有记录都应中止且 last_processed 不更新。

为了尝试实施这种交易策略,我有以下路线和测试用例(注意我有与问题无关的缩写项目。

package com.ea.wwce.camel.test.utilities;

import com.ea.wwce.camel.utilities.data.Record;
import com.ea.wwce.camel.utilities.data.RecordList;
import com.ea.wwce.camel.utilities.expressions.JodaDateTimeNow;
import com.ea.wwce.camel.utilities.jackson.RecordSerialization;
import com.ea.wwce.camel.utilities.transactions.TxnHelper;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import java.sql.Connection;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jackson.JacksonDataFormat;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.component.sql.SqlConstants;
import org.apache.camel.impl.JndiRegistry;
import org.testng.annotations.Test;
import static com.ea.wwce.camel.test.utilities.TransactionTestTools.*;
import static com.ea.wwce.camel.utilities.activemq.ActiveMQHelper.endpointAMQ;
import static com.ea.wwce.camel.utilities.jackson.RecordSerialization.toListOfJsonStrings;
import static org.apache.camel.ExchangePattern.InOnly;

/** Test Composite Transactions. */
public class CompositeTransactionTest extends AMQRouteTestSupport {
  public static final String MOCK_NOTIFICATION_END = "mock:notification_end";
  public static final String MOCK_AFTER_NOTIFICATION_SPLIT = "mock:after_notification_split";
  public static final String MOCK_CASE_SPLIT_END = "mock:case_split_end";
  public static final String MOCK_BEFORE_CASE_END = "mock:before_case_end";
  public static final String MOCK_ROUTED = "mock:routed";
  public static final String MOCK_AFTER_CASE_END = "mock:after_case_end";

  public static final String QUEUE_DEAD = endpointAMQ("dead");
  public static final String QUEUE_ROUTING = endpointAMQ("routing");
  public static final String QUEUE_TRIGGER = endpointAMQ("trigger");
  public static final String DIRECT_DO_CASE = "direct:do_case";
  public static final String DIRECT_DO_NOTIFICATION = "direct:do_notification";

  /** The database support object. */
  private H2DatabaseSupport dbSupport = createDBSupport(DBOnlyTransactionTest.class.getSimpleName());

  /** Jackson data format. */
  private JacksonDataFormat df = new JacksonDataFormat(createMapper(), Record.class);

  /** Jackson Mapper. */
  private ObjectMapper mapper = createMapper();

  /** Mock Endpoints. */
  private MockEndpoint mockDead, mockBeforeCaseEnd, mockAfterNotificationSplit, mockNotificationEnd, mockCaseSplitEnd, mockRouted, mockAfterCaseEnd;

  /** Creates a jackson mapper. */
  private static ObjectMapper createMapper() {
    final JsonFactory factory = new JsonFactory();
    factory.enable(JsonParser.Feature.ALLOW_COMMENTS);
    final ObjectMapper mapper = new ObjectMapper(factory);
    mapper.registerModule(RecordSerialization.createJacksonModule(Record.class, null));
    mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
    mapper.setSerializationInclusion(JsonInclude.Include.ALWAYS);
    return mapper;
  }

  public void initMocks() {
    mockDead = assertAndGetMockEndpoint(MOCK_DEAD);
    mockRouted = assertAndGetMockEndpoint(MOCK_ROUTED);
    mockBeforeCaseEnd = assertAndGetMockEndpoint(MOCK_BEFORE_CASE_END);
    mockAfterCaseEnd =  assertAndGetMockEndpoint(MOCK_AFTER_CASE_END);
    mockAfterNotificationSplit = assertAndGetMockEndpoint(MOCK_AFTER_NOTIFICATION_SPLIT);
    mockNotificationEnd = assertAndGetMockEndpoint(MOCK_NOTIFICATION_END);
    mockCaseSplitEnd = assertAndGetMockEndpoint(MOCK_CASE_SPLIT_END);
  }

  @Override
  protected void afterRegistryCreation(final JndiRegistry registry) throws Exception {
    dbSupport.registerNewDatasource(registry, DS_JNDI_KEY);
  }

  @Override
  protected RouteBuilder createRouteBuilder() {
    System.out.println("createRouteBuilder");
    return new RouteBuilder(this.context) {
      @Override
      public void configure() {
        context.setTracing(true);
        from(DIRECT_START).to(QUEUE_TRIGGER);
        from(QUEUE_TRIGGER).routeId(ROUTE_ID_TEST_ROUTE)
            .onException(RuntimeException.class).handled(true).useOriginalMessage().marshal(df).to(QUEUE_DEAD).markRollbackOnly().end()
            .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
            .setHeader(SqlConstants.SQL_QUERY, simple(SQL_CHECK))
            .to("sql:?dataSource=" + DS_JNDI_KEY)
            .convertBodyTo(RecordList.class)
            .split(body()) // Split checks
            .to(DIRECT_DO_NOTIFICATION)
            .end(); // end spit of checks;

        from(DIRECT_DO_NOTIFICATION)
            .onException(RuntimeException.class).handled(true).useOriginalMessage().marshal(df).to(QUEUE_DEAD).markRollbackOnly().end()
            .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
            .to(MOCK_AFTER_NOTIFICATION_SPLIT)
            .setHeader(HDR_PROC, simple("${body[process_id]}"))
            .setHeader(HDR_NOW, new JodaDateTimeNow("GMT", "yyyy-MM-dd HH:mm:ss+00:00"))
            .setHeader(HDR_TABLE, simple("${body[table_name]}"))
            .setHeader(SqlConstants.SQL_QUERY, simple(SQL_CASES))
            .to("sql:?dataSource=" + DS_JNDI_KEY)
            .convertBodyTo(RecordList.class)
            .split(body()) // split cases
            .to(DIRECT_DO_CASE)
            .end() // end split of process cases
            .to(MOCK_CASE_SPLIT_END)
            .setHeader(SqlConstants.SQL_QUERY, simple(SQL_UPDATE))
            .to("sql:?dataSource=" + DS_JNDI_KEY)
            .to(MOCK_NOTIFICATION_END);

        from(DIRECT_DO_CASE)
            .onException(RuntimeException.class).handled(true).useOriginalMessage().marshal(df).to(QUEUE_DEAD).end()
            .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
            .marshal(df)
            .to(MOCK_BEFORE_CASE_END)
            .to(QUEUE_ROUTING)
            .to(MOCK_AFTER_CASE_END);

        from(QUEUE_ROUTING).to(MOCK_ROUTED);
        from(QUEUE_DEAD).to(MOCK_DEAD);
      }
    };
  }

  @Test
  public void testSingleCaseFailIsIsolatedAfterEnqueue() throws Exception {
    try (final Connection connection = dbSupport.connectH2WithInitScripts()) {
      startCamelContext();
      initMocks();
      final RecordList notifications = notifications();
      final RecordList casesA = casesA();
      final RecordList casesB = casesB();

      mockAfterNotificationSplit.expectedBodiesReceivedInAnyOrder(notifications);
      //noinspection RedundantCast
      mockCaseSplitEnd.expectedBodiesReceivedInAnyOrder((Object) casesA, (Object) casesB);
      mockRouted.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper, casesA.get(1), casesA.get(2), casesB.get(0), casesB.get(1)));
      mockBeforeCaseEnd.expectedMessageCount(5);
      mockAfterCaseEnd.whenExchangeReceived(1, EXCEPTION_PROCESSOR);
      //noinspection RedundantCast
      mockNotificationEnd.expectedBodiesReceivedInAnyOrder((Object) casesA, (Object) casesB);
      mockDead.expectedMessageCount(1);
      mockDead.expectedBodiesReceived(toListOfJsonStrings(mapper, casesA().get(0)));

      template.sendBody(DIRECT_START, ""); // trigger process

      mockBeforeCaseEnd.assertIsSatisfied();
      mockAfterNotificationSplit.assertIsSatisfied();
      mockDead.assertIsSatisfied();
      mockRouted.assertIsSatisfied();
      mockNotificationEnd.assertIsSatisfied();

      assertFalse(isProcessTimeUnchanged(connection, notifications().get(0)));
      assertFalse(isProcessTimeUnchanged(connection, notifications().get(1)));
    }
  }
}

问题是测试用例 testSingleCaseFailIsIsolatedAfterEnqueue 不起作用。测试无法回滚排队的消息,并且似乎消息在事务提交之前已经路由。有谁知道如何解决这个问题?我一直在为这个测试用例付出巨大的努力,我想知道 AMQ 似乎没有参与事务,但一个测试用例中只有 AMQ 而数据库不能正常工作。

非常感谢您的意见或想法。


编辑

有趣的是,AMQ 似乎没有参与 JTA 事务。我不知道为什么会在这个时候发生。这是事务管理器的设置:

final Properties txnSvcProps = new Properties();
txnSvcProps.setProperty("com.atomikos.icatch.service", "com.atomikos.icatch.standalone.UserTransactionServiceFactory");
txnSvcProps.setProperty("com.atomikos.icatch.output_dir", "./target/atomikos/");
txnSvcProps.setProperty("com.atomikos.icatch.log_base_dir", "./target/atomikos/");
txnSvcProps.setProperty("com.atomikos.icatch.console_log_level", "DEBUG");
txnSvcProps.setProperty("com.atomikos.icatch.tm_unique_name", this.txnManagerServiceName);
/* The Atomikos user transaction service. */
atomikosUserTxnService = new UserTransactionServiceImp(txnSvcProps);
atomikosUserTxnService.init();
this.atomikosTxMgr = new UserTransactionManager();
this.atomikosTxMgr.setStartupTransactionService(false); // already started above.
this.atomikosTxMgr.setForceShutdown(false);
try {
  this.atomikosTxMgr.init();
} catch (final SystemException ex) {
  throw new AssertionError("Error initializing JTA transaction manager", ex);
}
this.userTxn = new UserTransactionImp();

这里是 AMQ 设置:

final ActiveMQXAConnectionFactory amqcf = registry.lookupByNameAndType(amqCFJndiName, ActiveMQXAConnectionFactory.class);
final ActiveMQComponent amq = (ActiveMQComponent) context.getComponent("activemq");
amq.setConnectionFactory(amqcf);
// -- Set Transaction manager because we will be using transacted(); note that this will find the correct one on the host platform.
amq.setTransactionManager(txnMgr);
// -- If we are using a JMSTransactionManager instance we have to set the connection factory in the manager.
if (txnMgr instanceof JmsTransactionManager) ((JmsTransactionManager) txnMgr).setConnectionFactory(amqcf);

【问题讨论】:

  • 刚刚添加了新信息。

标签: transactions apache-camel jta


【解决方案1】:

尝试在 processEvent 路由中交换 transacted/onException 的顺序。有一条规则说(Camel in Action §9.2.2):

在 Java DSL 中使用 transacted() 时,必须在后面加上 from() 以确保正确配置路由以使用 交易。

您是否使用 JTA 事务管理器并且 ActiveMQ 是否也参与事务?

【讨论】:

  • 这实际上没有任何作用。我试过了。
  • 什么是TxnHelper.KEY_TXNPOLICY_REQUIRED?您使用的是哪个事务管理器?可以启用事务日志(org.springframework.transaction 级别为 DEBUG)吗?
  • TxnHelper.KEY_TXNPOLICY_REQUIRED 只是一个字符串常量,其中包含已注册的事务策略的 JNDI 名称。至于日志,我正在准备一个非专有示例。我会尽快编辑。
【解决方案2】:

尝试按照指定的here.end() 添加到onException() 子句。

【讨论】:

  • 实际上我确实添加了这些。我只是犯了一个复制粘贴错误。实际路线更大,并且其中包含专有的东西。我将编辑代码并修复它。
【解决方案3】:

我发现了问题。使用 Atomikos 时,您必须将连接工厂包装在它们自己的 bean 中。添加这样的代码可以解决问题:

@Override
protected CamelContext createCamelContext() throws Exception {
  final CamelContext camelContext = super.createCamelContext();
  if (log.isDebugEnabled()) log.debug("Registering AMQ connection factory using key: {}", CONNECTION_FACTORY_JNDI_NAME);
  // -- create a connection factory proxy for atomikos.
  final AtomikosConnectionFactoryBean acfb = new AtomikosConnectionFactoryBean();
  acfb.setUniqueResourceName("amq-embedded");
  acfb.setXaConnectionFactory(brokerSupport.amqcf);
  acfb.setMaxPoolSize(10);
  acfb.init();
  registry().bind(CONNECTION_FACTORY_JNDI_NAME, acfb);
  ActiveMQHelper.configureActiveMQ(camelContext, registry(), fetchPlatformTxnMgr());
  return camelContext;
}

这段代码将连接工厂包装在 atomikos bean 中,从而解决了现在路由查找连接工厂时得到的 bean 并且所有内容都已排序的问题。

【讨论】:

    猜你喜欢
    • 2011-12-10
    • 2016-12-27
    • 2012-04-13
    • 1970-01-01
    • 1970-01-01
    • 2013-01-19
    • 2013-06-21
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多