【问题标题】:Consume multiple queues by priority按优先级消费多个队列
【发布时间】:2013-09-20 21:38:31
【问题描述】:

我有一个生产者向主题交换发送消息。每条消息都包含一个路由键。 (为原始图道歉)

       P
       |
       X
     /| |\
    / | | \
   /  | |  \
   Q1 Q2 Q3 Q4
   |   / /  /
   |  / /  /
   | / /  /
   |/ /  /
      C

我正在使用php-amqplib 并尝试使用多个队列。我想要实现的是顺序测试每个队列,查看它是否有消息,如果有,则处理它,否则,继续下一个队列。此外,如果找到消息,请从 Q1 重新开始检查过程。以下代码不起作用,但将演示我想要做的逻辑。

$connection = new AMQPConnection(HOST, PORT, USER, PASS, VHOST);
$channel = $connection->channel();

$channel->exchange_declare('myexchange', 'topic', false, false, false);

$channel->queue_declare("Q1", false, true, false, false);
$channel->queue_bind("Q1", 'myexchange', 'priority.1');

$channel->queue_declare("Q2", false, true, false, false);
$channel->queue_bind("Q2", 'myexchange', 'priority.2');

$channel->queue_declare("Q3", false, true, false, false);
$channel->queue_bind("Q3", 'myexchange', 'priority.3');

$channel->queue_declare("DFQ4", false, true, false, false);
$channel->queue_bind("DFQ4", 'myexchange', 'priority.4');

$queues = array('Q1','Q2','Q3','Q4');

$priority = 0;
while (1) {

    $priority = ($priority<4)? $priority+1 : 0;

    $msg = $channel->basic_consume($queues[$priority], $consumer_tag, false, false, false, false);
    if(isset($msg->body)) {
        echo ' [x] ',$msg->delivery_info['routing_key'], "\n";
        $channel->basic_ack($msg->delivery_info['delivery_tag']);
        $priority = 0;
    }
}

$channel->close();
$connection->close();

【问题讨论】:

  • 我不明白为什么您要再次检查队列而不是在收到消息时对其进行处理。而且我不明白当您可以声明任意数量的消费者并并行处理所有内容时,为什么需要优先级。
  • Q1 中的消息应在 Q2 中的一条之前处理,Q3 中的一条之前处理,依此类推。比如说,在某个时间点,只有在第四季度才有消息。一个将被处理,然后在尝试在 Q4 处理下一条消息之前检查是否有任何消息出现在 Q1、Q2 和 Q3。 Q1 的工作需要首先完成,因此需要优先级。 Doug Barth 描述了一个可能的解决方案 [dougbarth.github.io/2011/07/01/…
  • 您是否尝试过使用 basic_get 而不是 basic_consume 的代码?

标签: php rabbitmq php-5.4 php-amqp


【解决方案1】:

我的 .NET 项目也有同样的要求。基本上,我有类似的方法,但我使用标头交换将发布的消息提供给这些队列。我创建的库订阅所有这些队列(具有相同的预取大小值)并将所有这些消息放入内部内存队列,这些消息可以按其优先级排序。所以在管道的最后,处理消息的实际代码将从内部队列中读取它们,优先级较高的消息将被处理,然后在优先级较低的消息之前被确认。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2014-02-08
    • 1970-01-01
    • 1970-01-01
    • 2023-04-02
    • 1970-01-01
    • 2011-12-20
    • 2023-04-02
    • 2011-03-20
    相关资源
    最近更新 更多