【发布时间】:2014-04-15 01:58:46
【问题描述】:
我有一个有趣的用例,我想知道是否有人对此有想法。
在数据库中基本上有一个通知表,它基本上具有以下结构:
DROP TABLE IF EXISTS "notifications";
CREATE TABLE "notifications" (
"process_id" INT(11) NOT NULL DEFAULT '0',
"table_name" VARCHAR(40) NOT NULL,
"last_updated" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ,
"last_processed" TIMESTAMP NOT NULL,
PRIMARY KEY ("process_id")
);
INSERT INTO notifications (process_id, table_name, last_updated, last_processed) VALUES
(1, 'CASES_A', '2013-10-23 08:01:15+00:00','2013-10-22 08:30:22+00:00'),
(2, 'CASES_B', '2013-10-23 08:05:15+00:00','2013-10-22 08:05:15+00:00');
DROP TABLE IF EXISTS "CASES_A";
CREATE TABLE "CASES_A" (
"case_id" VARCHAR(25),
"case_date" TIMESTAMP NOT NULL,
PRIMARY KEY ("case_id")
);
INSERT into CASES_A (case_id, case_date) VALUES
('5000NQLj451NJ1', '2013-10-22 18:33:25+00:00'),
('5000NQLj4992F1', '2013-11-05 17:19:02+00:00'),
('5000NQLj8N9J11', '2013-11-06 08:03:08+00:00');
DROP TABLE IF EXISTS "CASES_B";
CREATE TABLE "CASES_B" (
"case_id" VARCHAR(25),
"case_date" TIMESTAMP NOT NULL,
PRIMARY KEY ("case_id")
);
INSERT into CASES_B (case_id, case_date) VALUES
('5000NQLk451NX4', '2013-10-21 10:23:26+00:00'),
('5000NQLk451NX5', '2013-10-20 11:10:25+00:00');
我们的想法是我们调用通知表的检查(在生产中使用石英完成),然后当有一条记录的最后更新日期大于最后处理日期时,我们读取 table_name 中的数据并处理记录,将它们路由到适当的队列,当我们路由所有记录时,我们更新 last_processed 日期。
用例有以下目标:
- 如果 table_name 中的单个记录处理失败,它会被发送到死信队列但不会回滚。
- 如果一条记录被发送到路由队列,但在子路由完成之前抛出异常,则应该从路由队列中删除该记录并发送到死亡。
- 如果上次处理时间的更新出现错误,所有已处理的记录都会从它们路由到的队列中删除。
- 如果 table_name 中的任何记录无法直接获取,则该事件中的所有记录都应中止且 last_processed 不更新。
为了尝试实施这种交易策略,我有以下路线和测试用例(注意我有与问题无关的缩写项目。
package com.ea.wwce.camel.test.utilities;
import com.ea.wwce.camel.utilities.data.Record;
import com.ea.wwce.camel.utilities.data.RecordList;
import com.ea.wwce.camel.utilities.expressions.JodaDateTimeNow;
import com.ea.wwce.camel.utilities.jackson.RecordSerialization;
import com.ea.wwce.camel.utilities.transactions.TxnHelper;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import java.sql.Connection;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jackson.JacksonDataFormat;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.component.sql.SqlConstants;
import org.apache.camel.impl.JndiRegistry;
import org.testng.annotations.Test;
import static com.ea.wwce.camel.test.utilities.TransactionTestTools.*;
import static com.ea.wwce.camel.utilities.activemq.ActiveMQHelper.endpointAMQ;
import static com.ea.wwce.camel.utilities.jackson.RecordSerialization.toListOfJsonStrings;
import static org.apache.camel.ExchangePattern.InOnly;
/** Test Composite Transactions. */
public class CompositeTransactionTest extends AMQRouteTestSupport {
public static final String MOCK_NOTIFICATION_END = "mock:notification_end";
public static final String MOCK_AFTER_NOTIFICATION_SPLIT = "mock:after_notification_split";
public static final String MOCK_CASE_SPLIT_END = "mock:case_split_end";
public static final String MOCK_BEFORE_CASE_END = "mock:before_case_end";
public static final String MOCK_ROUTED = "mock:routed";
public static final String MOCK_AFTER_CASE_END = "mock:after_case_end";
public static final String QUEUE_DEAD = endpointAMQ("dead");
public static final String QUEUE_ROUTING = endpointAMQ("routing");
public static final String QUEUE_TRIGGER = endpointAMQ("trigger");
public static final String DIRECT_DO_CASE = "direct:do_case";
public static final String DIRECT_DO_NOTIFICATION = "direct:do_notification";
/** The database support object. */
private H2DatabaseSupport dbSupport = createDBSupport(DBOnlyTransactionTest.class.getSimpleName());
/** Jackson data format. */
private JacksonDataFormat df = new JacksonDataFormat(createMapper(), Record.class);
/** Jackson Mapper. */
private ObjectMapper mapper = createMapper();
/** Mock Endpoints. */
private MockEndpoint mockDead, mockBeforeCaseEnd, mockAfterNotificationSplit, mockNotificationEnd, mockCaseSplitEnd, mockRouted, mockAfterCaseEnd;
/** Creates a jackson mapper. */
private static ObjectMapper createMapper() {
final JsonFactory factory = new JsonFactory();
factory.enable(JsonParser.Feature.ALLOW_COMMENTS);
final ObjectMapper mapper = new ObjectMapper(factory);
mapper.registerModule(RecordSerialization.createJacksonModule(Record.class, null));
mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
mapper.setSerializationInclusion(JsonInclude.Include.ALWAYS);
return mapper;
}
public void initMocks() {
mockDead = assertAndGetMockEndpoint(MOCK_DEAD);
mockRouted = assertAndGetMockEndpoint(MOCK_ROUTED);
mockBeforeCaseEnd = assertAndGetMockEndpoint(MOCK_BEFORE_CASE_END);
mockAfterCaseEnd = assertAndGetMockEndpoint(MOCK_AFTER_CASE_END);
mockAfterNotificationSplit = assertAndGetMockEndpoint(MOCK_AFTER_NOTIFICATION_SPLIT);
mockNotificationEnd = assertAndGetMockEndpoint(MOCK_NOTIFICATION_END);
mockCaseSplitEnd = assertAndGetMockEndpoint(MOCK_CASE_SPLIT_END);
}
@Override
protected void afterRegistryCreation(final JndiRegistry registry) throws Exception {
dbSupport.registerNewDatasource(registry, DS_JNDI_KEY);
}
@Override
protected RouteBuilder createRouteBuilder() {
System.out.println("createRouteBuilder");
return new RouteBuilder(this.context) {
@Override
public void configure() {
context.setTracing(true);
from(DIRECT_START).to(QUEUE_TRIGGER);
from(QUEUE_TRIGGER).routeId(ROUTE_ID_TEST_ROUTE)
.onException(RuntimeException.class).handled(true).useOriginalMessage().marshal(df).to(QUEUE_DEAD).markRollbackOnly().end()
.transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
.setHeader(SqlConstants.SQL_QUERY, simple(SQL_CHECK))
.to("sql:?dataSource=" + DS_JNDI_KEY)
.convertBodyTo(RecordList.class)
.split(body()) // Split checks
.to(DIRECT_DO_NOTIFICATION)
.end(); // end spit of checks;
from(DIRECT_DO_NOTIFICATION)
.onException(RuntimeException.class).handled(true).useOriginalMessage().marshal(df).to(QUEUE_DEAD).markRollbackOnly().end()
.transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
.to(MOCK_AFTER_NOTIFICATION_SPLIT)
.setHeader(HDR_PROC, simple("${body[process_id]}"))
.setHeader(HDR_NOW, new JodaDateTimeNow("GMT", "yyyy-MM-dd HH:mm:ss+00:00"))
.setHeader(HDR_TABLE, simple("${body[table_name]}"))
.setHeader(SqlConstants.SQL_QUERY, simple(SQL_CASES))
.to("sql:?dataSource=" + DS_JNDI_KEY)
.convertBodyTo(RecordList.class)
.split(body()) // split cases
.to(DIRECT_DO_CASE)
.end() // end split of process cases
.to(MOCK_CASE_SPLIT_END)
.setHeader(SqlConstants.SQL_QUERY, simple(SQL_UPDATE))
.to("sql:?dataSource=" + DS_JNDI_KEY)
.to(MOCK_NOTIFICATION_END);
from(DIRECT_DO_CASE)
.onException(RuntimeException.class).handled(true).useOriginalMessage().marshal(df).to(QUEUE_DEAD).end()
.transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
.marshal(df)
.to(MOCK_BEFORE_CASE_END)
.to(QUEUE_ROUTING)
.to(MOCK_AFTER_CASE_END);
from(QUEUE_ROUTING).to(MOCK_ROUTED);
from(QUEUE_DEAD).to(MOCK_DEAD);
}
};
}
@Test
public void testSingleCaseFailIsIsolatedAfterEnqueue() throws Exception {
try (final Connection connection = dbSupport.connectH2WithInitScripts()) {
startCamelContext();
initMocks();
final RecordList notifications = notifications();
final RecordList casesA = casesA();
final RecordList casesB = casesB();
mockAfterNotificationSplit.expectedBodiesReceivedInAnyOrder(notifications);
//noinspection RedundantCast
mockCaseSplitEnd.expectedBodiesReceivedInAnyOrder((Object) casesA, (Object) casesB);
mockRouted.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper, casesA.get(1), casesA.get(2), casesB.get(0), casesB.get(1)));
mockBeforeCaseEnd.expectedMessageCount(5);
mockAfterCaseEnd.whenExchangeReceived(1, EXCEPTION_PROCESSOR);
//noinspection RedundantCast
mockNotificationEnd.expectedBodiesReceivedInAnyOrder((Object) casesA, (Object) casesB);
mockDead.expectedMessageCount(1);
mockDead.expectedBodiesReceived(toListOfJsonStrings(mapper, casesA().get(0)));
template.sendBody(DIRECT_START, ""); // trigger process
mockBeforeCaseEnd.assertIsSatisfied();
mockAfterNotificationSplit.assertIsSatisfied();
mockDead.assertIsSatisfied();
mockRouted.assertIsSatisfied();
mockNotificationEnd.assertIsSatisfied();
assertFalse(isProcessTimeUnchanged(connection, notifications().get(0)));
assertFalse(isProcessTimeUnchanged(connection, notifications().get(1)));
}
}
}
问题是测试用例 testSingleCaseFailIsIsolatedAfterEnqueue 不起作用。测试无法回滚排队的消息,并且似乎消息在事务提交之前已经路由。有谁知道如何解决这个问题?我一直在为这个测试用例付出巨大的努力,我想知道 AMQ 似乎没有参与事务,但一个测试用例中只有 AMQ 而数据库不能正常工作。
非常感谢您的意见或想法。
编辑
有趣的是,AMQ 似乎没有参与 JTA 事务。我不知道为什么会在这个时候发生。这是事务管理器的设置:
final Properties txnSvcProps = new Properties();
txnSvcProps.setProperty("com.atomikos.icatch.service", "com.atomikos.icatch.standalone.UserTransactionServiceFactory");
txnSvcProps.setProperty("com.atomikos.icatch.output_dir", "./target/atomikos/");
txnSvcProps.setProperty("com.atomikos.icatch.log_base_dir", "./target/atomikos/");
txnSvcProps.setProperty("com.atomikos.icatch.console_log_level", "DEBUG");
txnSvcProps.setProperty("com.atomikos.icatch.tm_unique_name", this.txnManagerServiceName);
/* The Atomikos user transaction service. */
atomikosUserTxnService = new UserTransactionServiceImp(txnSvcProps);
atomikosUserTxnService.init();
this.atomikosTxMgr = new UserTransactionManager();
this.atomikosTxMgr.setStartupTransactionService(false); // already started above.
this.atomikosTxMgr.setForceShutdown(false);
try {
this.atomikosTxMgr.init();
} catch (final SystemException ex) {
throw new AssertionError("Error initializing JTA transaction manager", ex);
}
this.userTxn = new UserTransactionImp();
这里是 AMQ 设置:
final ActiveMQXAConnectionFactory amqcf = registry.lookupByNameAndType(amqCFJndiName, ActiveMQXAConnectionFactory.class);
final ActiveMQComponent amq = (ActiveMQComponent) context.getComponent("activemq");
amq.setConnectionFactory(amqcf);
// -- Set Transaction manager because we will be using transacted(); note that this will find the correct one on the host platform.
amq.setTransactionManager(txnMgr);
// -- If we are using a JMSTransactionManager instance we have to set the connection factory in the manager.
if (txnMgr instanceof JmsTransactionManager) ((JmsTransactionManager) txnMgr).setConnectionFactory(amqcf);
【问题讨论】:
-
刚刚添加了新信息。
标签: transactions apache-camel jta