【问题标题】:Is Apache Camel's idempotent consumer pattern scalable?Apache Camel 的幂等消费者模式是否可扩展?
【发布时间】:2014-07-23 11:32:43
【问题描述】:

我正在使用 Apache Camel 2.13.1 轮询一个数据库表,其中包含超过 300k 行。我希望使用Idempotent Consumer EIP 过滤已处理的行。

不过,我想知道该实现是否真的可扩展。我的骆驼上下文是:-

<camelContext xmlns="http://camel.apache.org/schema/spring">
        <route id="main">
        <from
            uri="sql:select * from transactions?dataSource=myDataSource&amp;consumer.delay=10000&amp;consumer.useIterator=true" />
        <transacted ref="PROPAGATION_REQUIRED" />
        <enrich uri="direct:invokeIdempotentTransactions" />                
        <!-- Any processors here will be executed on all messages -->
    </route>

    <route id="idempotentTransactions">
        <from uri="direct:invokeIdempotentTransactions" />
        <idempotentConsumer
            messageIdRepositoryRef="jdbcIdempotentRepository">
            <ognl>#{request.body.ID}</ognl>
            <!-- Anything here will only be executed for non-duplicates -->
            <log message="non-duplicate" />
            <to uri="stream:out" />
        </idempotentConsumer>
    </route>            
</camelContext>

似乎每 10 秒处理一次完整的 300k 行(通过 consumer.delay 参数),这似乎非常低效。我希望某种反馈循环作为模式的一部分,以便提供过滤器的查询可以利用已处理的行集。

但是,CAMEL_MESSAGEPROCESSED 表中的 messageid 列的模式为

 {1908988=null}

其中 1908988 是 request.body.ID 我已将 EIP 设置为 key on,因此这不容易合并到我的查询中。

有没有更好的方法将 CAMEL_MESSAGEPROCESSED 表用作我的 select 语句中的反馈循环,以便 SQL 服务器执行大部分负载?

更新:

所以,我后来发现是我的 ognl 代码导致了奇怪的消息 id 列值。改成

<el>${in.body.ID}</el>

已修复。所以,既然我有一个可用的 messageId 列,我现在可以将我的“来自”SQL 查询更改为

select * from transactions tr where tr.ID IN (select cmp.messageid from CAMEL_MESSAGEPROCESSED cmp where cmp.processor = 'transactionProcessor')

但我仍然认为我正在破坏 Idempotent Consumer EIP。

还有其他人这样做吗?有什么理由不这样做?

【问题讨论】:

  • 您的更新似乎解决了这个问题。只要您不每次都处理所有内容,我就看不出这种方法有什么问题。抱歉,我对 idempotentConsumer 还没有太多经验。

标签: sql apache-camel idempotent enterprise-integration


【解决方案1】:

是的,是的。但是您需要使用可扩展的存储来保存已处理的消息集。您可以使用 Hazelcast - http://camel.apache.org/hazelcast-idempotent-repository-tutorial.html 或 Infinispan - http://java.dzone.com/articles/clustered-idempotent-consumer - 取决于您的堆栈中已经存在哪种解决方案。当然,JDBC 存储库也可以工作,但前提是它满足所选的性能标准。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-10-09
    • 2011-01-08
    • 2020-06-23
    • 1970-01-01
    相关资源
    最近更新 更多