【发布时间】:2017-10-09 09:17:11
【问题描述】:
已解决:移动
channel.basicPublish("", QUEUE, props, message.getBytes());
下面
channel.basicConsume(replyQueue, ...)
这解决了问题。
我正在尝试弄清楚如何使用 RabbitMQ 直接回复功能。由于documentation 对如何实现它比较模糊,我尝试使用RPC 示例,采用它来使用直接回复。
private final static String QUEUE = "Test_chan";
private void directReplyToClient(ConnectionFactory factory) {
Connection connection = null;
Channel channel = null;
String replyQueue;
try {
connection = factory.newConnection();
channel = connection.createChannel();
//replyQueue = channel.queueDeclare().getQueue();
replyQueue = "amq.rabbitmq.reply-to";
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.replyTo(replyQueue)
.build();
String message = "Hello World";
channel.basicPublish("", QUEUE, props, message.getBytes());
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
channel.basicConsume(replyQueue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException {
response.offer(new String(body, "UTF-8"));
}
});
System.out.println(response.take());
} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
} finally {
try {
if (channel != null)
channel.close();
if (connection != null)
connection.close();
} catch (IOException | TimeoutException _ignore) {}
}
}
设置回复地址
channel.queueDeclare().getQueue()
有效,但将其设置为
amq.rabbitmq.reply-to
给出以下例外:
线程“主”com.rabbitmq.client.AlreadyClosedException 中的异常: 由于通道错误,通道已经关闭;协议方法: method(reply-code=406, reply-text=PRECONDITION_FAILED - 快速回复消费者不存在, class-id=60, method-id=40)
有人知道我做错了什么吗?任何指针将不胜感激。
【问题讨论】:
-
刚刚意识到异常是我试图关闭底部的频道。所以我试图关闭一个已经关闭的频道。为什么已经关门了?
-
我突然想通了。只需将 basicPublish 调用移到 basicConsume 调用下方,它就可以发挥作用。