【发布时间】:2019-01-14 15:08:46
【问题描述】:
我们有一个在功能强大的多核服务器上运行的导入程序。但是,我们的 Apache Camel 路由是单线程的,这很可惜。
我们的 [camel] 导入器是一个单实例程序。如何使特定路由使用多个线程处理消息?消息是原子的,由 bean 处理,bean 已经以线程安全的方式执行此操作。 如果我可以在线程中处理批处理 (maxMessagesPerPoll) 并且在下一次轮询发生之前有空闲时间(毕竟,这仍然比顺序处理更好),我已经很高兴了。
这是我想做多线程的路线之一:
public void onConfigure() throws Exception {
// This is a JPA query which selects all unprocessed modules
String query = RouteQueryHelper.selectNextUnprocessedStaged(ImportAction.IMPORT_MODULES);
from("jpa:com.so.importer.entity.ModuleStageEntity" +
"?consumer.query=" + query +
"&maxMessagesPerPoll=2000" +
"&consumeLockEntity=false" +
"&consumer.delay=1000" +
"&consumeDelete=false")
.transacted().policy("CAMEL_DEFAULT_POLICY")
.bean(moduleImportService) // processes the entity and updates it's status flag
.to("log:import-module?groupInterval=10000")
.routeId("so.route.import-module");
}
路由有consumeDelete=false,因为我们在实体上使用了状态属性(已修改并保存)。在consumer.query 中也尊重status 属性。
我们在 Java 8 上的 spring boot (1.3.8.RELEASE) 中使用骆驼版本 2.17.1。
编辑 2019 年 1 月 21 日:实体有一个带有 @Consumed 的方法,该方法在处理实体后将其推送到下一条路线:
@Consumed
public void gotoNextStatus() {
switch (stageStatus) {
case STAGED: setStageStatus(StageStatus.IMPORTED); break;
case IMPORTED: setStageStatus(StageStatus.RENDERED); break;
case RENDERED: setStageStatus(StageStatus.DONE); break;
}
}
【问题讨论】:
标签: apache-camel