【发布时间】:2014-07-23 11:32:43
【问题描述】:
我正在使用 Apache Camel 2.13.1 轮询一个数据库表,其中包含超过 300k 行。我希望使用Idempotent Consumer EIP 过滤已处理的行。
不过,我想知道该实现是否真的可扩展。我的骆驼上下文是:-
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route id="main">
<from
uri="sql:select * from transactions?dataSource=myDataSource&consumer.delay=10000&consumer.useIterator=true" />
<transacted ref="PROPAGATION_REQUIRED" />
<enrich uri="direct:invokeIdempotentTransactions" />
<!-- Any processors here will be executed on all messages -->
</route>
<route id="idempotentTransactions">
<from uri="direct:invokeIdempotentTransactions" />
<idempotentConsumer
messageIdRepositoryRef="jdbcIdempotentRepository">
<ognl>#{request.body.ID}</ognl>
<!-- Anything here will only be executed for non-duplicates -->
<log message="non-duplicate" />
<to uri="stream:out" />
</idempotentConsumer>
</route>
</camelContext>
似乎每 10 秒处理一次完整的 300k 行(通过 consumer.delay 参数),这似乎非常低效。我希望某种反馈循环作为模式的一部分,以便提供过滤器的查询可以利用已处理的行集。
但是,CAMEL_MESSAGEPROCESSED 表中的 messageid 列的模式为
{1908988=null}
其中 1908988 是 request.body.ID 我已将 EIP 设置为 key on,因此这不容易合并到我的查询中。
有没有更好的方法将 CAMEL_MESSAGEPROCESSED 表用作我的 select 语句中的反馈循环,以便 SQL 服务器执行大部分负载?
更新:
所以,我后来发现是我的 ognl 代码导致了奇怪的消息 id 列值。改成
<el>${in.body.ID}</el>
已修复。所以,既然我有一个可用的 messageId 列,我现在可以将我的“来自”SQL 查询更改为
select * from transactions tr where tr.ID IN (select cmp.messageid from CAMEL_MESSAGEPROCESSED cmp where cmp.processor = 'transactionProcessor')
但我仍然认为我正在破坏 Idempotent Consumer EIP。
还有其他人这样做吗?有什么理由不这样做?
【问题讨论】:
-
您的更新似乎解决了这个问题。只要您不每次都处理所有内容,我就看不出这种方法有什么问题。抱歉,我对 idempotentConsumer 还没有太多经验。
标签: sql apache-camel idempotent enterprise-integration