1.首先客户端连接,连接之后会发送discover消息类型,消息是发送给nats server(不是nats streaming server)。

nats streaming消息发布

这里会返回得到nats streaming server所有消息类型的对象(pub,sub,unsub,subClose等),客户端发送消息就是发送到对应object对象。这些对象是由nats streaming server发送给nats server保存的,具体发送如下,

nats streaming消息发布

2.发布消息,根据上一步得到的消息类型对象,发布pub消息(变量pubPrefix),nats收到消息后查找接收队列,发现只有nats streaming server订阅了此消息,随即转发给nats streaming server。

nats streaming消息发布

nats streaming消息发布

3.nats streaming server发布消息处理,主要是在ioLoop函数,监听s.ioChannel通道,消息到来会通过这个通道发送,然后调用storeIOPendingMsgs函数查找对应的监听通道,通道里保存了客户端发布订阅时保存的订阅对象列表channel.subStore.psubs,保存了每个订阅对象的对象类型,转发给nats server,最终到达订阅客户端。

nats streaming消息发布

 

nats streaming消息发布

注:这里最重要的就是nats streaming server定义了一个全局消息类型在nats server,所有的客户端发送消息是发给这个全局对象,nats streaming server收到消息后在根据具体订阅对象分发一个完整流程:

nats streaming client->nats server->nats streaming server->nats server->nats streaming client。

相关文章: