【问题标题】:How to use multithreading inside an JPA camel route如何在 JPA 骆驼路线中使用多线程
【发布时间】:2019-01-14 15:08:46
【问题描述】:

我们有一个在功能强大的多核服务器上运行的导入程序。但是,我们的 Apache Camel 路由是单线程的,这很可惜。

我们的 [camel] 导入器是一个单实例程序。如何使特定路由使用多个线程处理消息?消息是原子的,由 bean 处理,bean 已经以线程安全的方式执行此操作。 如果我可以在线程中处理批处理 (maxMessagesPerPoll) 并且在下一次轮询发生之前有空闲时间(毕竟,这仍然比顺序处理更好),我已经很高兴了。

这是我想做多线程的路线之一:

public void onConfigure() throws Exception {
    // This is a JPA query which selects all unprocessed modules
    String query = RouteQueryHelper.selectNextUnprocessedStaged(ImportAction.IMPORT_MODULES);

    from("jpa:com.so.importer.entity.ModuleStageEntity" +
            "?consumer.query=" + query +
            "&maxMessagesPerPoll=2000" +
            "&consumeLockEntity=false" +
            "&consumer.delay=1000" +
            "&consumeDelete=false")

        .transacted().policy("CAMEL_DEFAULT_POLICY")

        .bean(moduleImportService) // processes the entity and updates it's status flag
        .to("log:import-module?groupInterval=10000")

        .routeId("so.route.import-module");
}

路由有consumeDelete=false,因为我们在实体上使用了状态属性(已修改并保存)。在consumer.query 中也尊重status 属性。

我们在 Java 8 上的 spring boot (1.3.8.RELEASE) 中使用骆驼版本 2.17.1。

编辑 2019 年 1 月 21 日:实体有一个带有 @Consumed 的方法,该方法在处理实体后将其推送到下一条路线:

@Consumed
public void gotoNextStatus() {
    switch (stageStatus) {
        case STAGED: setStageStatus(StageStatus.IMPORTED); break;
        case IMPORTED: setStageStatus(StageStatus.RENDERED); break;
        case RENDERED: setStageStatus(StageStatus.DONE); break;
    }
}

【问题讨论】:

    标签: apache-camel


    【解决方案1】:

    您可以通过将消息发送到中间 SEDA 端点来引入一些异步:

    from("jpa:")
    ...
    .to("seda:intermediateStage")
    

    然后将真正的处理放在一个新的路由中,有 N 个并发 SEDA 消费者(默认为一个):

    from("seda:intermediateStage?concurrentConsumers=5")
    .process(...)
    

    【讨论】:

    • JPA 使用轮询:如果实体的处理速度比 jpa 拉的慢,这种模式不会导致实体被多次推送到 seda 路由吗?它以单路由模式工作,因为下一次轮询是在更新实体状态字段并且不再与查询匹配时完成的。我想这需要为“已准备好在 seda 中处理”引入一个新状态?
    • 我的假设是错误的,它不会多次推送到 seda 路由,但是下一个路由(监听状态字段下一个值)会在 seda 路由实际处理实体之前触发.
    • 此解决方案可能有效,但由于事务性问题,它需要一些严重的问题。一是懒惰的jpa实体不喜欢传递给其他线程。很抱歉,但我无法验证这个答案,因为解决问题的工作很可能比将 apache camel 代码重写到 spring 批处理导入器应用程序中更高。
    • @dube:如果你不能验证答案,你为什么接受它?
    • @Kukeltje 你希望我理解我两年前的推理吗? :) 可能是因为它似乎是正确的(其他地方也提到了相同的,一旦你知道要寻找什么),但整个环境让我们非常恼火,以至于我们用 spring batch 替换了它。从未回头
    猜你喜欢
    • 2018-09-05
    • 1970-01-01
    • 1970-01-01
    • 2011-05-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-08-01
    • 1970-01-01
    相关资源
    最近更新 更多