【问题标题】:RabbitMQ - How do I send fanout style message that is processed by only one of a group of consumers and all consumers outside the groupRabbitMQ - 如何发送仅由一组消费者中的一个和组外的所有消费者处理的扇出样式消息
【发布时间】:2015-01-13 13:45:33
【问题描述】:

我对 rabbitMQ 还很陌生,想知道如何设置系统以最适合我的需求。

假设我有 5 个消费者进程 (C1-5),而 C1-3 只需要 一个 来使用该消息。 C4 和 C5 需要接收消息。

如何配置 rabbitMQ 来实现这一点?

我考虑过在 C1-3 前面弹出另一个消费者来简单地推送到标准队列,C1-3 将从那里消费,但我想知道这是否是额外的工作,rabbitMQ 有更好的方法来解决这个问题?

非常感谢任何建议。

亲切的问候, 棒棒哒

【问题讨论】:

  • 你的图表正是兔子的工作原理,它应该很好。

标签: queue rabbitmq


【解决方案1】:

这很容易做到,只需将 C1-C3 附加到同一个队列即可。不需要额外的消费者/发布者路线。


示例:

(使用 PHP 和videlalvaro/php-amqplib。)

send.php

声明扇出交换“事件”并向其发布消息。

use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;

$exchange = 'events';

$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare($exchange, 'fanout', false, false, false);

$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "Something happened!";
$msg = new AMQPMessage($data);

$channel->basic_publish($msg, $exchange);

echo " [x] Sent ", $data, "\n";

$channel->close();
$connection->close();

pooledreceive.php (C1-C3)

附加到一个命名的“池” 队列。消息将在接收者之间分发。

use PhpAmqpLib\Connection\AMQPConnection;

$exchange = 'events';
$queue = 'pool';

$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare($exchange, 'fanout', false, false, false);
$channel->queue_declare($queue);
$channel->queue_bind($queue, $exchange);

echo ' [*] Waiting for events. To exit press CTRL+C', "\n";

$callback = function($msg) {
    echo ' [x] ', $msg->body, "\n";
};

$channel->basic_consume($queue, '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

receive.php (C4, C5)

每个接收者都有自己的队列和自己的消息副本。

use PhpAmqpLib\Connection\AMQPConnection;

$exchange = 'events';

$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare($exchange, 'fanout', false, false, false);

list($queue_name, ,) = $channel->queue_declare('');

$channel->queue_bind($queue_name, $exchange);

echo ' [*] Waiting for events. To exit press CTRL+C', "\n";

$callback = function($msg) {
    echo ' [x] ', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

【讨论】:

    猜你喜欢
    • 2015-10-03
    • 2016-05-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-04-12
    • 2020-12-07
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多