yangxu-pro

到目前为止,我们项目的结果大致如下:

 

 

  • 传感器生成的模拟数据(包含传感器名称、数据、时间戳)是通过传感器在运行时动态创建的 Queue 来发送的。这些 Queue 很难直接被发现。

  • 为了解决这个问题,我创建了另一个消息,它包含各传感器的 Queue 的路由 key,这个消息是在一个“众所周知”的 Queue 上发布的,所以协调器就可以得到传感器的路由信息。

  • 传感器的数据是发布在默认的 Direct     Exchange 上,也就是说只有一个消费者可以收到这个消息,这就是我们想要的效果。具体的,无论有多少个协调器,RabbitMQ 会保证只有一个协调器会收到信息,并且只会收到一次。

  • 然后,用于发现传感器的路径确有不同的需求,如果存在多个协调器,那么当传感器上线的时候,所有的协调器都必须得知,所以就不能使用 Direct     Exchange 了。这时使用 Fanout Exchange 就比较合理了,Fanout Exchange 将会同时通知所有附加在 Exchange 上面的 Queue,也就是把传感器的路由信息发送给所有在线的协调器。

  • 但是这也有其他问题:如果没有接收者监听,那么这些路由信息不会保留,这个问题稍后再解决,我们先把发布路由信息的 Exchange 从 Direct 改为 Fanout。

 

使用 Fanout Exchange 发布传感器路由信息

目前,在传感器项目中,我们使用默认的 Direct Exchange 来发布传感器路由消息:

 

 

看一下管理控制台,可以看到 RabbitMQ 还提供了一个 Fanout Exchange 也就是 amq.fanout: 

 

 

 

 

修改代码,暂时改用 amq.fanout 来发布传感器路由信息:

 

 

 

  1. 首先,删除第 38 行的代码,它原是用来创建一个 Queue 以便协调程序可以接收到传感器的路由信息。现在,这个工作将由 Exchange 的消费者们来完成,它们会创建自己的 Queue 来监听这个 Exchange。

  2. 第 43 行,把路由 Key 改为 “”,因为 Fanout Exchange 不需要使用该 Key 来决定消息发往哪里,它会把消息进行复制并发送到每个绑定到它的 Queue 上面。

  3. 最后,第 42 行,把 exchange 这个参数改为 amq.fanout。

 

运行 sensors 项目查看效果

 

 

 

打开控制台:

 

 

 

可以看到 amq.fanout 确实有数据了,尽管现在的消息传递速率为 0。

 

点进去:

 

 

 

可以看到一个路由信息,但是因为没有任何 Queue 绑定到这个 Exchange,这个消息就丢失了,因为消息无处可发。

  

重建协调器

在最早几节内容中,我做了一个非常简单的协调器程序,它可以简单的发布和接收消息。为了配合我们的应用场景,我们需要建立一个更健壮一些的协调器。它的主要职责是:通过消息代理(RabbitMQ)与传感器进行交互。

不过首先,为了代码复用,我对现有的项目结构进行调整:

 

 

 

我把项目的外层目录名从 sensors 改为 demo,然后在里面建立sensors 文件夹,把 main.go 移动到 sensors 里面,并改名为 sensor.go。

 

然后建立 coordinator 文件夹,在里面建立 queuelistener.go 文件,内容较多,我分为三个图展示:

 

 

 

  1. 第 15 行,建立 QueueListener struct,它里面包含发现传感器数值 Queue 的逻辑,接收它们的消息,并把它们在一个事件聚合器里面翻译成事件。不过目前它主要聚焦获取消息这项工作,所以它有三个字段:

    1. 到 RabbitMQ 的连接

    2. 在该连接上的 Channel

    3. 一个 Map,当作注册表,里面存放着这个协调器所监听的源,使用 Map 可以防止将同一个传感器注册两次,而当传感器下线的时候可以通过这个 Map 来关闭监听(这个我就不实现了)

  2. 第 21 行,建立一个构造函数,它可以返回一个 *QueueListener

 

 

 第 31 行创建一个方法 ListenForNewSource:

  1. 它可以让 QueueListener 发现新的传感器,在这里创建 Queue 的时候,我们不关心 Queue 的名称,所以 name 参数为“”,这样的话 RabbitMQ 会为它创建一个唯一的名称。

  2. 但是当 Queue 被创建时,它会默认绑定到 Direct Exchange。而在之前,我刚把代码修改为让传感器通过 amq.fanout Exchange 来发布它们的信息,所以我们需要把这个 Queue 重新绑定到那个上面。这里就使用 Channel 上的 QueueBind 方法来实现(第 33 行)。

  3. QueueBind 方法参数:

    1. 第一个参数是刚刚创建的 Queue 的名称,这就是要绑定的 Queue

    2. 第二个参数是路由 Key,由于 Fanout Exchange 会忽略这个参数,所以这里写“”

    3. 第三个参数是要绑定的 Exchange 的名称,也就是 amq.fanout

    4. 第四个参数,如果把 noWait 设置为 true,那么万一绑定不成功,就会把 Channel 关闭。这里我把它设为 false,因为我知道 Exchange 和 Queue 都会存在,如果失败,那么会关闭 Channel 并发生错误。

    5. 第五个参数不需要,设为 nil

  4. 第 40 行,设置消息的接收,返回 Go Channel,这里的参数需要用到 Queue 的名称

  5. 第 49 行,通过 for range 来处理通过 Go Channel 发过来的消息。如果接收到消息,表示有新的传感器上线了。

  6. 第 50 行,在有传感器上线后,通过 Consume 方法和 msg.Body(也就是传感器的名称),来读取传感器的模拟数据。记得我们把传感器的模拟数据发布到了默认的 Direct Exchange 上面,所以每次只会把消息传递给一个接收者,这意味着,当我注册了多个协调器的时候,它们将共享到这些 Queue 的访问,当这些发生的时候,RabbitMQ 将会轮流传送给每一个注册的接收者。这也就允许我们对协调器进行横向扩展,而且不影响整个系统其余的部分。

  7. 第 59 行,判断传感器是否在该协调器中注册,如果没有,那就进行注册。

  8. 第 62 行,使用 goroutine 来调用 AddListener 方法,该方法代码如下:

 

 

  1. 这个方法将会监听 Go Channel 中的消息

  2. 在里面使用 for range 来等待 Go Channel 传送消息

  3. 在这里,我们把二进制数据转化为我们可以在程序里使用的数据,也就是     SensorMessage 类型

  4. 然后暂时先打印即可

 

建立协调器的 main

在 coordinator 目录下建立 exec 文件夹,目的是创建 main package,在里面创建 main.go 代码如下:

 

 

  1. 第 9 行,我们创建一个 QueueListener

  2. 第 10 行,使用 goroutine 让他进行监听,防止阻塞主线程

  3. 第 12-13 行的目的就是让程序一直存活,防止 goroutine 停止运行。

最后 sensor.go 里面有一处代码需要修改,在 main 函数的 for 循环里面,每次使用 encoder 的时候都需要 重新创建一个,所以我添加了 63 行的代码:

 

 

运行 

我们运行一下试试,注意:一定要先运行 coordinator 项目,然后再运行 sensors 项目,否则会有问题。 下面左侧是 coordinator,右侧是 sensors:

 

 

可以看到 coordinator(协调器)可以读取到传感器的数据了。 

这里我们使用了一个最简单最基本的机制来做传感器 Queue 的发现。

 
 
 

相关文章: