【问题标题】:How to handle heavy message load in Spring Integration?如何在 Spring Integration 中处理繁重的消息负载?
【发布时间】:2013-11-14 08:31:10
【问题描述】:

外部模块向消息代理发送数千条消息。每条消息都有一个等于 5 秒的 TimeToLive 属性。另一个模块应该使用并处理所有消息。

从 Spring Integration 文档中,我发现 Staged Event-driven 架构(消费者)对负载的显着峰值做出更好的响应。

我当前的实现使用 EDA(甚至驱动架构),例如

<si:channel id="inputChannel"/>

<!-- get messages from PRESENCE_ENGINE queue -->    
<int-jms:message-driven-channel-adapter id="messageDrivenAdapter" 
    channel="inputChannel" destination="sso" connection-factory="connectionFactory"  
    max-concurrent-consumers="1" auto-startup="true" acknowledge="transacted" extract-payload="true"/>

<si:service-activator id ="activatorClient" input-channel="inputChannel" ref="messageService" method="processMessage"/> 

<bean id="messageService" class="com.my.messaging.MessageService"/>

<bean id="sso" 
    class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="SSO" />
</bean> 

很明显,负载很重,例如传入数千条消息, processMessage() 可能需要超过 5 秒。 MessageService 可能无法处理所有消息。

我的想法如下:

  1. 修改 processMessage() 以便消息而不是 处理的仅存储在 MongoDB 中。然后我可以处理 独立任务中的消息。在这种情况下 MongoDB 将用作缓存。

  2. 使用大量消费者(SEDA 模型)。 inputChannel 是直接通道。

  3. 异步处理消息,例如inputChannel 是一个队列通道,消息是异步处理的。

在做出决定之前,我想问一下哪种方案更有效。也许场景 2) 和 3) 提供了一种机制来满足我的要求,即所有消息都应该被处理,即使负载很重。

编辑:

我已经实现了场景 2,我每秒发送 1000 条消息。 这是使用不同参数的统计信息丢失了多少消息:

最大并发消费者;生存时间=5 秒。空闲消费者限制; # 已发送消息; # 收到的消息

 10 ; Yes ; 1   ; 1001 ; 297
100 ; Yes ; 1   ; 1001 ; 861
150 ; Yes ; 1   ; 1001 ; 859
300 ; Yes ; 1   ; 1001 ; 861
300 ; No  ; 1   ; 1001 ; 860
300 ; No  ; 100 ; 1001 ; 1014
300 ; No  ; 50  ; 1001 ; 1011

似乎 idle-consumer-limit 比 max-concurrent 消费者更积极地创建消费者。这是在这种情况下使用 idle-consumer-limit 的好方法吗?

这是我的发件人/消费者配置文件:

<!-- SENDER  
Keep Alive Sender sends messages to backup server -->    

<si:channel id="sendToChannel"/>
<si:channel id="presChannel"/>

<si:inbound-channel-adapter id="senderEntity" channel="sendToChannel" method="sendMessage"> 
    <bean class="com.ucware.ucpo.sso.cache.CacheSender"/>
    <si:poller fixed-rate="${sender.sendinterval}"></si:poller>
</si:inbound-channel-adapter>    

<si:router id="messageRouter" method="routeMessage" input-channel="sendToChannel">
    <bean class="com.ucware.ucpo.sso.messaging.MessageRouter"/>
</si:router>

<!-- Subscriber to a channel dispatcher, Send messages to JMS -->
<int-jms:outbound-channel-adapter  explicit-qos-enabled="${jms.qos.enabled}" time-to-live="${jms.message.lifetime}" 
    channel="presChannel" connection-factory="connectionFactory" destination="pres" extract-payload="false"/>

<bean id="pres" 
    class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="PRES" />
</bean>


<!-- RECEIVER -->

<si:channel id="receiveChannel"/>

<!-- get messages from PRES queue -->    
<int-jms:message-driven-channel-adapter id="messageDrivenAdapter" 
    channel="receiveChannel" destination="presence" connection-factory="connectionFactory"  idle-consumer-limit="50" 
    max-concurrent-consumers="300" auto-startup="true" acknowledge="transacted" extract-payload="true"/>

<si:service-activator id ="activatorClient" input-channel="receiveChannel" ref="messageService" method="processMessage"/> 


<bean id="messageService" class="com.cache.MessageService"/>

【问题讨论】:

  • 您似乎有一个奇怪的要求组合。有限的生存时间,加上所需的交付时间会给你带来麻烦。有什么理由让你无法消除生活的时间吗?或者将代理配置为将它们放入持久队列?
  • 确实,这是我们可以省略的要求。但我们需要确保所有消息都被消费。我在场景 2 中的初始测试显示,在一分钟内发送的 1000 条消息仅收到 860 条(开启和关闭生存时间)。
  • 这听起来很奇怪......消息代理都应该能够确保没有消息丢失。

标签: spring messaging spring-integration


【解决方案1】:

首先您可以尝试使用max-concurrent-consumers 属性。如您所见,在您的情况下,1 确实不够。 您应该调查为什么您的MessageService 如此缓慢。 任何其他情况看起来都像是开销,因为 JMS 已经是持久的并且具有异步性质 - 基于队列。 如果没有帮助,请使用&lt;queue&gt; 频道和存在MessageStore,例如MongoDB

【讨论】:

  • 我提高了最大并发消费者的数量,它有所帮助,但仅在一定程度上有所帮助(请参阅我的更新)。 +1 建议。
猜你喜欢
  • 1970-01-01
  • 2014-10-23
  • 2018-02-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-01-26
相关资源
最近更新 更多