【问题标题】:RabbitMQ direct reply to. I'm getting the AlreadyClosedExceptionRabbitMQ 直接回复。我收到了 AlreadyClosedException
【发布时间】:2017-10-09 09:17:11
【问题描述】:

已解决:移动

channel.basicPublish("", QUEUE, props, message.getBytes());

下面

channel.basicConsume(replyQueue, ...)

这解决了问题。


我正在尝试弄清楚如何使用 RabbitMQ 直接回复功能。由于documentation 对如何实现它比较模糊,我尝试使用RPC 示例,采用它来使用直接回复。

private final static String QUEUE = "Test_chan";
private void directReplyToClient(ConnectionFactory factory) {
    Connection connection = null;
    Channel channel = null;
    String replyQueue;

    try {
        connection = factory.newConnection();
        channel = connection.createChannel();

        //replyQueue = channel.queueDeclare().getQueue();
        replyQueue = "amq.rabbitmq.reply-to";
        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .replyTo(replyQueue)
                .build();
        String message = "Hello World";
        channel.basicPublish("", QUEUE, props, message.getBytes());

        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);

        channel.basicConsume(replyQueue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException {

                response.offer(new String(body, "UTF-8"));

            }
        });

        System.out.println(response.take());

    } catch (IOException | TimeoutException | InterruptedException e) {
        e.printStackTrace();
    } finally {
        try {
            if (channel != null)
                channel.close();
            if (connection != null)
                connection.close();
        } catch (IOException | TimeoutException _ignore) {}
    }
}

设置回复地址

channel.queueDeclare().getQueue()

有效,但将其设置为

amq.rabbitmq.reply-to

给出以下例外:

线程“主”com.rabbitmq.client.AlreadyClosedException 中的异常: 由于通道错误,通道已经关闭;协议方法: method(reply-code=406, reply-text=PRECONDITION_FAILED - 快速回复消费者不存在, class-id=60, method-id=40)

有人知道我做错了什么吗?任何指针将不胜感激。

【问题讨论】:

  • 刚刚意识到异常是我试图关闭底部的频道。所以我试图关闭一个已经关闭的频道。为什么已经关门了?
  • 我突然想通了。只需将 basicPublish 调用移到 basicConsume 调用下方,它就可以发挥作用。

标签: java rabbitmq


【解决方案1】:

所以这是解决方案的代码。在发布之前进行消费。

private final static String QUEUE = "Test_chan";

private void directReplyToProducer(ConnectionFactory factory) {
    Connection connection = null;
    Channel channel = null;
    String replyQueue;

    try {
        connection = factory.newConnection();
        channel = connection.createChannel();

        replyQueue = "amq.rabbitmq.reply-to";
        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .replyTo(replyQueue)
                .build();
        String message = "Hello World";

        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
        System.out.println(" [x] Sent x'" + message + "'");

        channel.basicConsume(replyQueue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException {
                response.offer(new String(body, "UTF-8"));
            }
        });
        channel.basicPublish("", QUEUE, props, message.getBytes());

        System.out.println(response.take());
        Thread.sleep(10000);

    } catch (IOException | TimeoutException | InterruptedException e) {
        e.printStackTrace();
    } finally {
        try {
            if (channel != null)
                channel.close();
            if (connection != null)
                connection.close();
        } catch (IOException | TimeoutException _ignore) {}
    }
}

【讨论】:

  • 天哪,有了你的回答,我在 C# 客户端中解决了我的问题。非常感谢。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-11-13
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多