【问题标题】:jms persistent messages and order of messages obtainedjms 持久消息和获得的消息顺序
【发布时间】:2013-09-11 08:53:13
【问题描述】:

我需要什么: 1.通过jms发送消息。 2. 保留消息,以便如果未从客户端和服务器重新启动处理,客户端会在服务器重新启动时获取消息。 3. 从客户端接收消息的顺序应与发送这些消息的顺序相同。 4. 队列中有多个消费者,以便并行扩展和处理不同的消息。

我做了什么: 我使用active mq并具有以下spring配置

<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg index="0" value="destination"/>
</bean>

<bean id="template" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="pooledConnectionFactory"/>
    <property name="deliveryMode" value="2"/>
    <property name="defaultDestination" ref="destination"/>
</bean>

<bean id="messageListener" class="InternalNotificationJMSConsumer"/>

<jms:listener-container connection-factory="pooledConnectionFactory" acknowledge="client" destination-type="queue" concurrency="5-10" container-type="simple"  cache="none" client-id="client1" prefetch="1">
      <jms:listener destination="destination" ref="messageListener" method="onMessage"/>
</jms:listener-container>

所以我有我的目的地和我的连接池。然后我创建一个 spring jmstemplate 并开始发送消息

template.send(new MessageCreator() {
  public Message createMessage(Session session) throws JMSException {
    return session.createObjectMessage(message);
  }
});

然后我的接收器如下:

@Component
public class JMSConsumer implements MessageListener {

  @Override
  public void onMessage(Message message) {
    if (message instanceof ObjectMessage) {
      ....
     }
   }

我的问题:

一个。使用此配置的消息不是按顺序接收的。鉴于 listener-container.concurency 中的 xsd 注释,这是有道理的:“在主题侦听器或消息顺序很重要的情况下,将并发限制为 1;”。所以问题是我如何/是否可以保持订单但有多个消费者。

b.消息确认。我需要的是,在处理完消息后,onMessage 返回,然后才将消息视为已确认并且不会重新传输。我没有事务管理器,因为我不在任何应用程序服务器内,并且希望尽可能避免这种情况。我只需要自己确认消息。这可能吗?

【问题讨论】:

    标签: spring jms activemq


    【解决方案1】:

    如果您正在并行使用消息,那么由于线程的性质,永远无法保证它们将完全按照它们发送的顺序完成 - 只有它们会按顺序传递给消费者。如果只有某些消息是绝对必须订购的(通常与交易/帐户相关),您可以使用Message Groups 功能将组与消费者关联 - 一种粘性负载平衡。

    自己确认消息绝对是可能的,但可能不需要。 MessageListenerContainer 有一个 acknowledge 属性,如果设置为 transacted 将导致消息仅在 MessageListener 完成执行后被确认为已使用,而不会引发异常。不需要事务管理器(尽管 Spring 的 PlatformTransactionManager 不需要任何特定容器,如果您确实想使用一个容器)。

    如果您确实想自己确认消息,请将acknowledge 设置为client,并在消息成功完成后调用message.acknowledge()。如果侦听器在没有调用它的情况下完成,则消息将被发送到 DLQ(默认)或重新传递。

    【讨论】:

    • "只是他们会按顺序交给消费者。"--> 好的,这就是我关心的,那么处理顺序是另一回事。那么如何确保消息按顺序交给消费者呢?通过消息组?关于 message.acknowledge(),这是我尝试过但没有成功的方法,我会尝试使用 transacted,因为它听起来很有希望
    • 切换总是由 ActiveMQ 以循环方式在消费者之间按顺序完成。在消费者端有一种称为预取缓冲区的机制,它充当缓存,这样您就不必不断地向代理询问更多消息 - 默认情况下,每个队列设置为 1000 条消息。根据到达时间,消费者可能会在下一个消费者到达之前填满该缓冲区。在jms:listener-container 上通过prefetch="1" 设置它以获得消息的公平分配。
    • 首先谢谢你,杰克,我需要说出来,因为你的反应很快而且中肯。也就是说,请检查我上面的代码。已经这样做并设置了 prefetch="1"。我理解你所说的,这也是我的理解,但这不是我在代码中看到的。
    • 不确定这是否来自 activemq/spring 代码或乱序的消息首先保留在 kahadb 中,因为这是不可能的那个时刻要处理,然后当存在可用线程来接收消息时,这些线程会返回到表面。我将删除持久性并重新检查消息是否仍然乱序并再次对此发现发表评论。也许问题存在于 kahadb 级别......如果是这样,可能需要一些其他配置来优先处理持久消息
    • 你如何检查消息是否乱序?
    【解决方案2】:

    好的,这就是我的问题的答案。

    <jms:listener-container connection-factory="pooledConnectionFactory" acknowledge="client" destination-type="queue" concurrency="5-10" container-type="simple"  cache="none" prefetch="1">
          <jms:listener destination="destination" ref="messageListener" method="onMessage"/>
    </jms:listener-container>
    

    然后是消息监听器的实现: BlockingExecutor 执行器 = 新的 BlockingExecutor(5); //要消费的并发按摩的数量

    public void onMessage(Message message) {
    ....
    executor.submitTask(new MessageDispatchCommand((ObjectMessage) message));
    ....
    

    }

    class MessageDispatchCommand implements Runnable {
      private ObjectMessage message;
    
      public MessageDispatchCommand (final ObjectMessage message) {
        this.message = message;
      }
    
      public void run() {
        try {
          Serializable msg = message.getObject();
          handle message here
         }....
    }
    

    BlockingExecutor 代码可以从这里找到: Java Concurrency in Practice: BoundedExecutor implementation

    简单地说。我只有一个 jms 消息侦听器,因为这可以保持消息顺序正确。 然后我创建可运行的命令来获取消息并处理它们(这是完成的耗时工作,但它是在不同的线程中完成的)。 由于我在所有消息处理线程都已耗尽时使用有界执行器,因此 onMessage 会阻塞,因此后续消息会简单地持久化,直到有界执行器的线程再次空闲。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-06-14
      • 2013-05-28
      • 2015-07-02
      • 2011-07-15
      • 2014-05-18
      相关资源
      最近更新 更多