这很容易做到,只需将 C1-C3 附加到同一个队列即可。不需要额外的消费者/发布者路线。
示例:
(使用 PHP 和videlalvaro/php-amqplib。)
send.php
声明扇出交换“事件”并向其发布消息。
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;
$exchange = 'events';
$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare($exchange, 'fanout', false, false, false);
$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "Something happened!";
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, $exchange);
echo " [x] Sent ", $data, "\n";
$channel->close();
$connection->close();
pooledreceive.php (C1-C3)
附加到一个命名的“池” 队列。消息将在接收者之间分发。
use PhpAmqpLib\Connection\AMQPConnection;
$exchange = 'events';
$queue = 'pool';
$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare($exchange, 'fanout', false, false, false);
$channel->queue_declare($queue);
$channel->queue_bind($queue, $exchange);
echo ' [*] Waiting for events. To exit press CTRL+C', "\n";
$callback = function($msg) {
echo ' [x] ', $msg->body, "\n";
};
$channel->basic_consume($queue, '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
receive.php (C4, C5)
每个接收者都有自己的队列和自己的消息副本。
use PhpAmqpLib\Connection\AMQPConnection;
$exchange = 'events';
$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare($exchange, 'fanout', false, false, false);
list($queue_name, ,) = $channel->queue_declare('');
$channel->queue_bind($queue_name, $exchange);
echo ' [*] Waiting for events. To exit press CTRL+C', "\n";
$callback = function($msg) {
echo ' [x] ', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();