【发布时间】:2013-11-14 08:31:10
【问题描述】:
外部模块向消息代理发送数千条消息。每条消息都有一个等于 5 秒的 TimeToLive 属性。另一个模块应该使用并处理所有消息。
从 Spring Integration 文档中,我发现 Staged Event-driven 架构(消费者)对负载的显着峰值做出更好的响应。
我当前的实现使用 EDA(甚至驱动架构),例如
<si:channel id="inputChannel"/>
<!-- get messages from PRESENCE_ENGINE queue -->
<int-jms:message-driven-channel-adapter id="messageDrivenAdapter"
channel="inputChannel" destination="sso" connection-factory="connectionFactory"
max-concurrent-consumers="1" auto-startup="true" acknowledge="transacted" extract-payload="true"/>
<si:service-activator id ="activatorClient" input-channel="inputChannel" ref="messageService" method="processMessage"/>
<bean id="messageService" class="com.my.messaging.MessageService"/>
<bean id="sso"
class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="SSO" />
</bean>
很明显,负载很重,例如传入数千条消息, processMessage() 可能需要超过 5 秒。 MessageService 可能无法处理所有消息。
我的想法如下:
修改 processMessage() 以便消息而不是 处理的仅存储在 MongoDB 中。然后我可以处理 独立任务中的消息。在这种情况下 MongoDB 将用作缓存。
使用大量消费者(SEDA 模型)。 inputChannel 是直接通道。
- 异步处理消息,例如inputChannel 是一个队列通道,消息是异步处理的。
在做出决定之前,我想问一下哪种方案更有效。也许场景 2) 和 3) 提供了一种机制来满足我的要求,即所有消息都应该被处理,即使负载很重。
编辑:
我已经实现了场景 2,我每秒发送 1000 条消息。 这是使用不同参数的统计信息丢失了多少消息:
最大并发消费者;生存时间=5 秒。空闲消费者限制; # 已发送消息; # 收到的消息
10 ; Yes ; 1 ; 1001 ; 297
100 ; Yes ; 1 ; 1001 ; 861
150 ; Yes ; 1 ; 1001 ; 859
300 ; Yes ; 1 ; 1001 ; 861
300 ; No ; 1 ; 1001 ; 860
300 ; No ; 100 ; 1001 ; 1014
300 ; No ; 50 ; 1001 ; 1011
似乎 idle-consumer-limit 比 max-concurrent 消费者更积极地创建消费者。这是在这种情况下使用 idle-consumer-limit 的好方法吗?
这是我的发件人/消费者配置文件:
<!-- SENDER
Keep Alive Sender sends messages to backup server -->
<si:channel id="sendToChannel"/>
<si:channel id="presChannel"/>
<si:inbound-channel-adapter id="senderEntity" channel="sendToChannel" method="sendMessage">
<bean class="com.ucware.ucpo.sso.cache.CacheSender"/>
<si:poller fixed-rate="${sender.sendinterval}"></si:poller>
</si:inbound-channel-adapter>
<si:router id="messageRouter" method="routeMessage" input-channel="sendToChannel">
<bean class="com.ucware.ucpo.sso.messaging.MessageRouter"/>
</si:router>
<!-- Subscriber to a channel dispatcher, Send messages to JMS -->
<int-jms:outbound-channel-adapter explicit-qos-enabled="${jms.qos.enabled}" time-to-live="${jms.message.lifetime}"
channel="presChannel" connection-factory="connectionFactory" destination="pres" extract-payload="false"/>
<bean id="pres"
class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="PRES" />
</bean>
<!-- RECEIVER -->
<si:channel id="receiveChannel"/>
<!-- get messages from PRES queue -->
<int-jms:message-driven-channel-adapter id="messageDrivenAdapter"
channel="receiveChannel" destination="presence" connection-factory="connectionFactory" idle-consumer-limit="50"
max-concurrent-consumers="300" auto-startup="true" acknowledge="transacted" extract-payload="true"/>
<si:service-activator id ="activatorClient" input-channel="receiveChannel" ref="messageService" method="processMessage"/>
<bean id="messageService" class="com.cache.MessageService"/>
【问题讨论】:
-
您似乎有一个奇怪的要求组合。有限的生存时间,加上所需的交付时间会给你带来麻烦。有什么理由让你无法消除生活的时间吗?或者将代理配置为将它们放入持久队列?
-
确实,这是我们可以省略的要求。但我们需要确保所有消息都被消费。我在场景 2 中的初始测试显示,在一分钟内发送的 1000 条消息仅收到 860 条(开启和关闭生存时间)。
-
这听起来很奇怪......消息代理都应该能够确保没有消息丢失。
标签: spring messaging spring-integration