ramantic

原文来自 RabbitMQ 英文官网教程(3.Publish and Subscribe),其示例代码采用了 .NET C# 语言。

Markdown

In the previous tutorial we created a work queue. The assumption behind a work queue is that each task is delivered to exactly one worker. In this part we'll do something completely different -- we'll deliver a message to multiple consumers. This pattern is known as "publish/subscribe".

在之前的教程中我们创建了一个工作队列,其背后的设想便是每一个任务恰好地递送给一个工作单元。在本教程中,我们的做法将完全不同 -- 即递送消息给多个消费者,这个模式被称作“发布/订阅”。

To illustrate the pattern, we're going to build a simple logging system. It will consist of two programs -- the first will emit log messages and the second will receive and print them.

为了说明这个模式,我们将要构建一个简单日志系统。它将由两个程序组成 -- 第一个将会发送日志消息,而第二个将会接收并打印它们。

In our logging system every running copy of the receiver program will get the messages. That way we'll be able to run one receiver and direct the logs to disk; and at the same time we'll be able to run another receiver and see the logs on the screen.

在我们的日志系统中,每一个正在运行的接收程序副本都将获得消息。如此,我们将运行一个接收者来将日志写入磁盘,与此同时再运行另一个接收者,这样可以在屏幕上查看日志。

Essentially, published log messages are going to be broadcast to all the receivers.

本质上来讲,已发布的日志消息将会广播给所有的接收者。

Exchanges

交换机

In previous parts of the tutorial we sent and received messages to and from a queue. Now it's time to introduce the full messaging model in Rabbit.

在本教程的之前部分,我们从队列中发送和接收消息,现在是时候来介绍 Rabbit 中完整的消息模型了。

Let's quickly go over what we covered in the previous tutorials:

  • A producer is a user application that sends messages.
  • A queue is a buffer that stores messages.
  • A consumer is a user application that receives messages.

让我们快速温习一下之前教程中所包括的内容:

  • 生产者,用以发送消息的用户程序。
  • 队列,即存储消息的缓冲区。
  • 消费者,用以接收消息的用户程序。

The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn't even know if a message will be delivered to any queue at all.

在 RabbitMQ 中,消息模型的核心理念是生产者永远不会把任何消息直接发送到队列。事实上,通常生产者甚至不知道消息是否会被递送到任何队列。

Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.

取而代之的是,生产者只能发送消息给一个交换机。交换机很简单,一方面它从生产者接收消息,另一方面它又把消息推送到队列。但是交换机必须明确知道对它所接收到的消息该做什么。(比如)需要将消息追加到指定队列?还是追加到多个队列?还是要丢弃?这些规则都是在交换机类型中定义。

Markdown

There are a few exchange types available: direct, topic, headers and fanout. We'll focus on the last one -- the fanout. Let's create an exchange of this type, and call it logs:

目前有若干交换机类型可用:direct、topic、headers 以及 fanout。我们将以最后一个为重点 -- fanout,让我们创建一个该类型的交换机,并将其叫作“logs”:

channel.ExchangeDeclare("logs", "fanout");

The fanout exchange is very simple. As you can probably guess from the name, it just broadcasts all the messages it receives to all the queues it knows. And that's exactly what we need for our logger.

fanout 型交换机非常简单,从它的名称你可能猜出,它会广播所有已接收到的消息给所有已知的队列,这正好是我们的日志系统所需要的。

Listing exchanges
列举出交换机

To list the exchanges on the server you can run the ever useful rabbitmqctl:

为了列举出服务端的交换机,你可以运行此前非常有用的 rabbitmqctl 命令:

sudo rabbitmqctl list_exchanges

In this list there will be some amq.* exchanges and the default (unnamed) exchange. These are created by default, but it is unlikely you'll need to use them at the moment.

在这个列表中有一些 amq.* 和默认的(未命名)交换机,这些都是默认创建的,但此时你不太可能需要用到它们。

The default exchange
默认的交换机

In previous parts of the tutorial we knew nothing about exchanges, but still were able to send messages to queues. That was possible because we were using a default exchange, which we identify by the empty string ("").

在教程的之前部分我们对交换机还一无所知,但这并不妨碍我们能够发送消息到队列中,这之所以成为可能,是因为我们使用了基于空字符串来标识的默认交换机

Recall how we published a message before:

回忆一下以前我们是如何发布消息的:

var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
                    routingKey: "hello",
                    basicProperties: null,
                    body: body);

The first parameter is the the name of the exchange. The empty string denotes the default or nameless exchange: messages are routed to the queue with the name specified by routingKey, if it exists.

第一个参数就是交换机的名字。空字符串表示默认的或者匿名的交换机:根据明确的路由键(routingKey)将消息路由到已存在的队列。

Now, we can publish to our named exchange instead:

现在,我们用具名(自定义命名)的交换机来进行发布:

var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs",
                     routingKey: "",
                     basicProperties: null,
                     body: body);

Temporary queues

临时队列

As you may remember previously we were using queues which had a specified name (remember hello and task_queue?). Being able to name a queue was crucial for us -- we needed to point the workers to the same queue. Giving a queue a name is important when you want to share the queue between producers and consumers.

你可能还记得之前我们使用过指定名称的队列(记得好像是 hello 和 task_queue?)。能够为队列命名对我们来说是至关重要的 -- 我们需要将工作单元指向相同的队列。同样,当你想在生产者和消费者之间共享队列时,为队列赋予一个名字也是非常重要的。

But that's not the case for our logger. We want to hear about all log messages, not just a subset of them. We're also interested only in currently flowing messages not in the old ones. To solve that we need two things.

但是,以上并不是我们日志系统的案例。我们想要监听所有的日志消息,而不仅仅是它们的一部分。我们只对当前正在流动的消息感兴趣,而不是旧的消息,为解决这个问题我需要做好两件事。

Firstly, whenever we connect to Rabbit we need a fresh, empty queue. To do this we could create a queue with a random name, or, even better - let the server choose a random queue name for us.

首先,无论我们何时连接到 Rabbit,都需要一个崭新的、空的队列,为做到这一点我们可以使用随机名称来创建一个队列,当然更好的做法是,让服务端为我们选择一个随机名称。

Secondly, once we disconnect the consumer the queue should be automatically deleted.

其次,一旦我们与消费者断开连接,相关的队列也应当能自动删除。

In the .NET client, when we supply no parameters to queueDeclare() we create a non-durable, exclusive, autodelete queue with a generated name:

在 .NET 客户端中,当我们调用 queueDeclare 方法而并未提供任何参数时,实际上就是创建了一个非持久化、独享,且自动删除的具名队列。

var queueName = channel.QueueDeclare().QueueName;

At that point queueName contains a random queue name. For example it may look like amq.gen-JzTY20BRgKO-HjmUJj0wLg.

在此处 queueName 包含的是一个随机队列名称,比如它看起来可能像 amq.gen-JzTY20BRgKO-HjmUJj0wLg。

Bindings

绑定

Markdown

We've already created a fanout exchange and a queue. Now we need to tell the exchange to send messages to our queue. That relationship between exchange and a queue is called a binding.

我们已经创建了一个 fanout 型交换机和队列,现在我们需要告诉交换机把消息发送到队列,那么,交换机和队列之间的关系就被称作绑定

channel.QueueBind(queue: queueName,
                  exchange: "logs",
                  routingKey: "");

From now on the logs exchange will append messages to our queue.

从现在开始,日志交换机将会把消息追加到队列中。

Listing bindings
列举绑定

You can list existing bindings using, you guessed it:

你可以列举出现有的绑定,(所使用的命令)你该可以猜到:

rabbitmqctl list_bindings

Putting it all together

融合一起

Markdown

The producer program, which emits log messages, doesn't look much different from the previous tutorial. The most important change is that we now want to publish messages to our logs exchange instead of the nameless one. We need to supply a routingKey when sending, but its value is ignored for fanout exchanges. Here goes the code for EmitLog.cs file:

发出日志消息的生产者程序,与之前教程看起来并无太大不同。现在,最重要的变化莫过于我们想把消息发布到名为 logs 的交换器中,而非匿名。在发送消息时我们需要提供一个路由键(routingKey),只不过它的值在 fanout 型交换机中被忽略了,针对 EmitLog.cs 文件的代码如下:

using System;
using RabbitMQ.Client;
using System.Text;

class EmitLog
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "logs", type: "fanout");

            var message = GetMessage(args);
            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: "logs",
                                 routingKey: "",
                                 basicProperties: null,
                                 body: body);
            Console.WriteLine(" [x] Sent {0}", message);
        }

        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }

    private static string GetMessage(string[] args)
    {
        return ((args.Length > 0)
               ? string.Join(" ", args)
               : "info: Hello World!");
    }
}

(EmitLog.cs source)

As you see, after establishing the connection we declared the exchange. This step is necessary as publishing to a non-existing exchange is forbidden.

如同你所见,在建立好连接之后我们声明了交换机。这一步非常有必要,因为禁止向一个不存在的交换机发布消息。

The messages will be lost if no queue is bound to the exchange yet, but that's okay for us; if no consumer is listening yet we can safely discard the message.

如果尚没有任何队列绑定到交换机,消息将会丢失,但这对我们来说并没有什么问题;如果没有任何消费者正在监听,我们可以将消息安全地删除。

The code for ReceiveLogs.cs:

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

class ReceiveLogs
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "logs", type: "fanout");

            var queueName = channel.QueueDeclare().QueueName;
            channel.QueueBind(queue: queueName,
                              exchange: "logs",
                              routingKey: "");

            Console.WriteLine(" [*] Waiting for logs.");

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(" [x] {0}", message);
            };
            channel.BasicConsume(queue: queueName,
                                 autoAck: true,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

(ReceiveLogs.cs source)

Follow the setup instructions from tutorial one to generate the EmitLogs and ReceiveLogs projects.

从教程的第一章开始,跟随安装说明来生成 EmitLogs 和 ReceiveLogs 工程。

If you want to save logs to a file, just open a console and type:

如果你想保存日志到一个文件,只需打开控制台并输入:

cd ReceiveLogs
dotnet run > logs_from_rabbit.log

If you wish to see the logs on your screen, spawn a new terminal and run:

如果你想在屏幕上查看日志,重开一个新的终端并运行:

cd ReceiveLogs
dotnet run

And of course, to emit logs type:

当然,产生日志只需输入:

cd EmitLog
dotnet run

Using rabbitmqctl list_bindings you can verify that the code actually creates bindings and queues as we want. With two ReceiveLogs.cs programs running you should see something like:

使用 rabbitmqctl list_bindings 命令你可以核实代码的确已经创建了我们期望的绑定和队列,伴随着 ReceiveLogs.cs 程序的运行你应该可以看到类似如下内容:

sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
# => logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
# => ...done.

The interpretation of the result is straightforward: data from exchange logs goes to two queues with server-assigned names. And that's exactly what we intended.

对结果的解释就非常简洁明了:来自 logs 交换机的数据将去往两个由服务端分配名称的队列,而这恰好是我们所期望的。

相关文章: