【问题标题】:How to Give manual Acknowledge using JmsTemplate and delete message from Rabbitmq queue如何使用 JmsTemplate 手动确认并从 Rabbitmq 队列中删除消息
【发布时间】:2017-04-09 09:52:15
【问题描述】:

我正在使用带有 jmsTemplate 的 RabbitMq(带 JMS)我能够从 RabbitMq 队列中使用消息,但它需要自动确认。

我有它的搜索 API,但无法找到它。

如何设置手动确认。

在下面的代码中,当从队列中使用消息时,我想使用该消息调用 Web 服务,并取决于来自我想从队列中删除该消息的响应。 我创建了一个项目,其中我正在使用侦听器和其他项目调用从队列中读取消息

第一个项目:

package com.es.jms.listener;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.listener.MessageListenerContainer;
import org.springframework.jms.listener.SimpleMessageListenerContainer;

import com.rabbitmq.jms.admin.RMQConnectionFactory;

@Configuration
public class RabbitMqMessageListener {

    @Bean
    public ConnectionFactory jmsConnectionFactory() {
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setUsername("Username");
        connectionFactory.setPassword("Password");
        connectionFactory.setVirtualHost("vhostname");
        connectionFactory.setHost("hostname");

        return connectionFactory;
    }

    @Bean
    public MessageListener msgListener() {
        return new MessageListener() {
            public void onMessage(Message message) {

                System.out.println(message.toString());
                if (message instanceof TextMessage) {
                    try {
                        String msg = ((TextMessage) message).getText();
                        System.out.println("Received message: " + msg);

                        // call web service here and depends on web service
                        // response
                        // if 200 then delete msg from queue else keep msg in
                        // queue

                    } catch (JMSException ex) {
                        throw new RuntimeException(ex);
                    }
                }

            }
        };
    }

    @Bean
    public MessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(jmsConnectionFactory());
        container.setDestinationName("test");

        container.setMessageListener(msgListener());
        return container;

    }
}

第二个项目:

package com.rabbitmq.jms.consumer.controller;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import javax.jms.ConnectionFactory;

import org.json.JSONException;
import org.json.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.jms.JmsException;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;


import com.rabbitmq.jms.admin.RMQConnectionFactory;

import redis.clients.jedis.Jedis;

@Controller
public class ReceiverController {
    @Autowired
    JmsTemplate jmsTemplate;


    @Bean
    public ConnectionFactory jmsConnectionFactory() {
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setUsername("Username");
        connectionFactory.setPassword("Password");
        connectionFactory.setVirtualHost("vhostname");
        connectionFactory.setHost("hostname");

        return connectionFactory;
    }

    @CrossOrigin
    @SuppressWarnings({ "unchecked", "rawtypes" })
    @RequestMapping(method = RequestMethod.GET, value = "/getdata")
    @ResponseBody
    public ResponseEntity<String> fecthDataFromRedis()
            throws JSONException, InterruptedException, JmsException, ExecutionException, TimeoutException {
        System.out.println("in controller");

        jmsTemplate.setReceiveTimeout(500L);
        // jmsTemplate.
        String message = (String) jmsTemplate.receiveAndConvert("test");

                    // call web service here and depends on web service
                    // response
                    // if 200 then delete msg from queue else keep msg in
                    // queue
        System.out.println(message);

        }

        return new ResponseEntity(message , HttpStatus.OK);

    }

}

我该怎么做?

提前致谢。

【问题讨论】:

    标签: spring spring-boot rabbitmq spring-jms


    【解决方案1】:

    您没有使用JmsTemplate,而是使用SimpleMessageListenerContainer 来接收消息。

    如果您使用该模板,则必须使用带有SessionCallbackexecute 方法,因为确认必须发生在接收消息的会话范围内。

    但是,对于SimpleMessageListenerContainer,您只需将sessionAcknowledgeMode 设置为Session.CLIENT_ACKNOWLEDGE。查看容器 javadocs...

    /**
     * Message listener container that uses the plain JMS client API's
     * {@code MessageConsumer.setMessageListener()} method to
     * create concurrent MessageConsumers for the specified listeners.
     *
     * <p>This is the simplest form of a message listener container.
     * It creates a fixed number of JMS Sessions to invoke the listener,
     * not allowing for dynamic adaptation to runtime demands. Its main
     * advantage is its low level of complexity and the minimum requirements
     * on the JMS provider: Not even the ServerSessionPool facility is required.
     *
     * <p>See the {@link AbstractMessageListenerContainer} javadoc for details
     * on acknowledge modes and transaction options. Note that this container
     * exposes standard JMS behavior for the default "AUTO_ACKNOWLEDGE" mode:
     * that is, automatic message acknowledgment after listener execution,
     * with no redelivery in case of a user exception thrown but potential
     * redelivery in case of the JVM dying during listener execution.
     *
     * <p>For a different style of MessageListener handling, through looped
     * {@code MessageConsumer.receive()} calls that also allow for
     * transactional reception of messages (registering them with XA transactions),
     * see {@link DefaultMessageListenerContainer}.
       ...
    

    编辑

    使用JmsTemplate 时,您必须在会话范围内完成您的工作 - 方法如下...

    首先,您必须在模板中启用客户端确认...

    this.jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
    

    然后,使用 execute 方法和 SessionCallback ...

    Boolean result = this.jmsTemplate.execute(session -> {
        MessageConsumer consumer = session.createConsumer(
                this.jmsTemplate.getDestinationResolver().resolveDestinationName(session, "bar", false));
        String result = null;
        try {
            Message received = consumer.receive(5000);
            if (received != null) {
                result = (String) this.jmsTemplate.getMessageConverter().fromMessage(received);
    
                // Do some stuff here.
    
                received.acknowledge();
                return true;
            }
        }
        catch (Exception e) {
            return false;
        }
        finally {
            consumer.close();
        }
    }, true);
    

    【讨论】:

    • 如我所说,你必须使用execute 方法——我添加了一个示例。
    • 感谢分享代码。但我想使用将从队列中消耗的消息调用 Web 服务。在您的代码中,我将无法在其中访问输出的“字符串值”。如果我把它带到外面并处理它,直到消息被确认并从队列中删除。
    • 我不明白你的担心;你可以打电话给你的网络服务,我说// Do some stuff here。您可以退回任何您想要的东西;我只是碰巧将值返回给外部代码。我已将示例更改为返回布尔值 - 也许这样会更清楚。如果你抛出一个异常,它会返回 false 并且消息不会被确认。
    • 感谢分享代码。这解决了问题,我昨天已经工作了一整天。对我来说另一种可行的方法是创建一个新连接和一个消息使用者并在连接上启用 client_ack,但您的解决方案可以使用 jmsTemplate 在 client_ack 中使用消息。问候。
    猜你喜欢
    • 2023-03-09
    • 1970-01-01
    • 1970-01-01
    • 2015-12-20
    • 1970-01-01
    • 2015-11-05
    • 2014-09-26
    • 2015-06-21
    • 2021-07-21
    相关资源
    最近更新 更多