【问题标题】:How to synchronize consumers with rabbitmq如何将消费者与rabbitmq同步
【发布时间】:2018-05-20 14:01:10
【问题描述】:

我有以下问题: 我有一个 RabbitMQ 集群、一个消息生产者和一个消费者集群(用于高可用性)。 每当消费者接收到消息时,它都会根据消息内容生成另一个进程并运行它。这个过程很长,大概需要30分钟。

我必须确保每次处理一条消息。但是,消费者多于一个,因此如果队列中有 2 条消息,则一个消费者获取一条消息,第二个消费者获取另一条消息,并且它们被并行处理。

供参考:每个消费者驻留在不同的机器上。

在 RabbitMQ 级别上是否有任何机制可以让我等待消费下一条消息,直到前一条消息被确认?还是我必须在服务器之间开发一些锁定机制?

【问题讨论】:

  • 在我的回答帖子中添加了一些更新。
  • 添加了更多更新。

标签: synchronization rabbitmq synchronous


【解决方案1】:

正如@vsoni 通常解释的那样,我们可以使用基本的 QOS,这将允许每个消费者一次消费 X 条消息,更准确地说是每个频道。由于通道是“共享单个 TCP 连接的轻量级连接”(请参阅​​here),因此据我所知,不可能在两个不同机器上运行的两个消费者之间建立 QOS。我的目标是不向第二个消费者发送消息,而第一个消费者仍在消费前一个消费者。我只能通过外部锁定来实现它(出于开发目的,我在一台机器上制作了一个简单的文件锁和 3 个使用者),但对于生产我可能会使用DynamoDB locking。其他选择是使用 Zookeeper、etcd 或类似软件的分布式锁定。

【讨论】:

  • 参考此链接 - rabbitmq.com/blog/2014/02/19/… ... 可能会有所帮助。但只是想了解....如果您不希望其他消费者(甚至驻留在不同的机器上)在消费者仍在处理一条消息时处理一条消息,那么为什么需要多个消费者。一个消费者就足够了。
  • 这是为了高可用性,我在问题中提到过。我们在两个可用区(AWS)中有机器,万一“死”了,我们仍然希望能够处理消息,但是(不幸的是,由于 API 限制)我们不能一次连接两个客户端)。但是感谢您提供的所有信息,这真的很有帮助!
【解决方案2】:

我只是参考 rabbitmq 文档中的几点我认为能够满足您的要求。

  1. 第一参考 - https://www.rabbitmq.com/consumer-priority.html

通常,连接到队列的活动消费者从 它以循环方式进行。当使用消费者优先权时, 如果存在多个活动消费者,则消息将循环传递 具有相同的高优先级。

我假设您所有的消费者都具有相同的优先级,因此消息将平均分配给所有活跃的消费者。

  1. 第二个参考 - https://www.rabbitmq.com/consumer-prefetch.html

basic.qos 方法允许您限制数量 消费时通道(或连接)上未确认的消息

正如您所提到的,您将在一台机器上拥有单个消费者,这更加容易。

只需为每个消费者设置消费者预取限制 1。因此,在要求确认之前,服务器将只向消费者传递一条消息。并在您的消息完全处理后发送基本确认。

Channel channel = ...;
Consumer consumer = ...;
channel.basicQos(1); // Per consumer limit

Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("received message");
                // process the message .. time consuming

                // after processing send the basic ack so that next message can be received from queue
                channel.basicAck(envelope.getDeliveryTag(), false);
        };

 channel.basicConsume("my-queue", false, consumer);

我希望这会有所帮助。

更新 -

只是添加更多描述- 当你使用

channel.basicQos(x);

Rabbitmq 将向通道上的每个消费者推送(如果在队列中可用,当然在遵守优先级和循环等之后)最大 x 数量的未确认消息。这意味着通道上的每个消费者不会有超过 x 条未确认的消息,也就是说,消费者可以在任何给定时刻同时处理最多 x 条消息。一旦消费者发回一个确认,下一条消息就可以被推送给它。如果消费者觉得它无法处理消息,它也可以发送一个 nack。在这种情况下,消息将被重新排队,重新排队的消息可能会根据优先级、循环等方式发送到队列中的任何消费者。

每个渠道可以有多个消费者。所以,当你使用

channel.basicQos(x, true);

限制 x 适用于整个渠道,而不是渠道上的单个/每个消费者。

在您的情况下,每个渠道只有一个消费者。所以频道限制实际上对你的情况没有任何影响。

更多更新 -

一台机器通过连接连接到 RabbitMQ。一个连接可以有多个通道。并且一个频道可以有多个消费者。所以逻辑上可以有不同的机器连接到 RabbitMQ 并且有多个通道和消费者在同一个队列上监听。您可以同时为通道(使用channel.basicQos(x, true))和通道内的消费者(使用channel.basicQos(x, false))设置QOS 限制。限制 0 表示无限制。显然,这些限制适用于通道实例。驻留在不同通道实例(同一台机器或不同机器上)的所有消费者将有自己的限制(默认值或通过 QOS 方法显式设置)。

【讨论】:

  • 谢谢!不确定我是否理解正确。如果我将 QOS 设置为 1,有两个相同的消费者,那么不可能同时消费两条消息?
  • 嗨,很抱歉回复晚了。感谢所有的解释。我现在看到了 QOS 是如何工作的,但我试图弄清楚如何将许多消费者连接到一个通道(使用 Node.js)。如果我能完成,我会解决问题:) 我会更新进度
  • 所以从我在这里可以看到:stackoverflow.com/questions/18418936/… 这是不可能的。通道是 TCP 连接中的“虚拟连接”。我不认为我可以使用相同的 TCP 连接在两台不同的机器上创建两个不同的客户端;)所以我认为这是不可能的,我将不得不进行一些外部锁定:(
猜你喜欢
  • 2018-10-16
  • 1970-01-01
  • 1970-01-01
  • 2011-04-15
  • 1970-01-01
  • 2013-10-10
  • 1970-01-01
  • 1970-01-01
  • 2014-07-23
相关资源
最近更新 更多