我只是参考 rabbitmq 文档中的几点我认为能够满足您的要求。
- 第一参考 - https://www.rabbitmq.com/consumer-priority.html
通常,连接到队列的活动消费者从
它以循环方式进行。当使用消费者优先权时,
如果存在多个活动消费者,则消息将循环传递
具有相同的高优先级。
我假设您所有的消费者都具有相同的优先级,因此消息将平均分配给所有活跃的消费者。
- 第二个参考 - https://www.rabbitmq.com/consumer-prefetch.html
basic.qos 方法允许您限制数量
消费时通道(或连接)上未确认的消息
正如您所提到的,您将在一台机器上拥有单个消费者,这更加容易。
只需为每个消费者设置消费者预取限制 1。因此,在要求确认之前,服务器将只向消费者传递一条消息。并在您的消息完全处理后发送基本确认。
Channel channel = ...;
Consumer consumer = ...;
channel.basicQos(1); // Per consumer limit
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("received message");
// process the message .. time consuming
// after processing send the basic ack so that next message can be received from queue
channel.basicAck(envelope.getDeliveryTag(), false);
};
channel.basicConsume("my-queue", false, consumer);
我希望这会有所帮助。
更新 -
只是添加更多描述-
当你使用
channel.basicQos(x);
Rabbitmq 将向通道上的每个消费者推送(如果在队列中可用,当然在遵守优先级和循环等之后)最大 x 数量的未确认消息。这意味着通道上的每个消费者不会有超过 x 条未确认的消息,也就是说,消费者可以在任何给定时刻同时处理最多 x 条消息。一旦消费者发回一个确认,下一条消息就可以被推送给它。如果消费者觉得它无法处理消息,它也可以发送一个 nack。在这种情况下,消息将被重新排队,重新排队的消息可能会根据优先级、循环等方式发送到队列中的任何消费者。
每个渠道可以有多个消费者。所以,当你使用
channel.basicQos(x, true);
限制 x 适用于整个渠道,而不是渠道上的单个/每个消费者。
在您的情况下,每个渠道只有一个消费者。所以频道限制实际上对你的情况没有任何影响。
更多更新 -
一台机器通过连接连接到 RabbitMQ。一个连接可以有多个通道。并且一个频道可以有多个消费者。所以逻辑上可以有不同的机器连接到 RabbitMQ 并且有多个通道和消费者在同一个队列上监听。您可以同时为通道(使用channel.basicQos(x, true))和通道内的消费者(使用channel.basicQos(x, false))设置QOS 限制。限制 0 表示无限制。显然,这些限制适用于通道实例。驻留在不同通道实例(同一台机器或不同机器上)的所有消费者将有自己的限制(默认值或通过 QOS 方法显式设置)。