【发布时间】:2014-09-11 19:38:21
【问题描述】:
我们可以通过从https://www.rabbitmq.com/community-plugins.html 安装插件 rabbitmq-priority-queue 使 RabbitMQ 成为分布式优先级队列。我将元素推送到队列中(每个元素都按优先级推送),并且我能够根据需要在消费者中接收队列的内容 - 优先级较高的元素首先出现。
问题是当这种情况持续发生时,优先轮询概念不起作用:
- 运行发布者以在队列中填充具有不同优先级的 3 个项目。
- 使用队列中的消息 - 运行良好 - 按 优先事项。现在消费者等待队列中的任何消息,截至 现在队列是空的。
- 我再次运行发布者以填充大约 5 个元素。
- 消费者不会优先消费队列中的 5 个项目,而是按照发布者发布的第 3 步的顺序消费。
我需要的是每次轮询队列中所有内容中具有最高优先级的队列项应该首先出现。
谁能告诉我这里发生了什么错误?谢谢。
这里是发布者和消费者(Java)的sn-p:
出版商
public class RabbitMQPublisher {
private static final String QUEUE = "my-priority-queue-3";
public static void main(String[] argv) throws Exception {
final ConnectionFactory factory = new ConnectionFactory();
final Connection conn = factory.newConnection();
final Channel ch = conn.createChannel();
final Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", 100);
ch.queueDeclare(QUEUE, true, false, false, args);
publish(ch, 24);
publish(ch, 11);
publish(ch, 75);
//second run
//publish(ch, 27);
//publish(ch, 77);
//publish(ch, 12);
conn.close();
}
private static void publish(Channel ch, int priority) throws IOException {
final BasicProperties props = MessageProperties.PERSISTENT_BASIC.builder().priority(priority).build();
final String body = "message with priority " + priority;
ch.basicPublish("", QUEUE, props, body.getBytes());
}
消费者
while (true) {
final QueueingConsumer.Delivery delivery = consumer.nextDelivery();
final String message = new String(delivery.getBody());
System.out.println(message);
}
输出:
message with priority 75
message with priority 24
message with priority 11
message with priority 27
message with priority 77
message with priority 12
【问题讨论】:
-
您是否费心设置
basic.qos?否则可能会在那个while循环中睡一会儿,看看会发生什么。 -
感谢您的回复@Adam Gent。我通过使用 basicGet 而不是 consumer.nextDelivery 来完成上述工作。最终字符串消息 = new String(channel.basicGet(QUEUE_NAME, true).getBody());这会根据队列中的优先级检索项目。