【发布时间】:2018-06-21 12:54:39
【问题描述】:
我们最近开始为其中一个应用程序开发 Typescript 语言,该应用程序需要在服务器和客户端/客户端之间进行队列通信。
为了实现队列通信,我们尝试使用 ZeroMQ 库版本 4.6.0 作为 npm 包:npm install -g zeromq and npm install -g @types/zeromq。
具体场景:
客户端将通过 ZeroMQ 向服务器发送数千条消息。反过来,服务器将响应来自客户端的每个传入消息的一些确认消息。根据确认消息,客户端将发送下一条消息。
使用的 ZeroMQ 模式:
ROUTER/DEALER 模式(我们不能使用任何其他模式)。
客户端代码:
import Zmq = require('zeromq');
let clientSocket : Zmq.Socket;
let messageQueue = [];
export class ZmqCommunicator
{
constructor(connString : string)
{
clientSocket = Zmq.socket('dealer');
clientSocket.connect(connString);
clientSocket.on('message', this.ReceiveMessage);
}
public ReceiveMessage = (msg) => {
var argl = arguments.length,
envelopes = Array.prototype.slice.call(arguments, 0, argl - 1),
payload = arguments[0];
var json = JSON.parse(msg.toString('utf8'));
if(json.type != "error" && json.type =='ack'){
if(messageQueue.length>0){
this.Dispatch(messageQueue.splice(0, 1)[0]);
}
}
public Dispatch(message) {
clientSocket.send(JSON.stringify(message));
}
public SendMessage(msg: Message, isHandshakeMessage : boolean){
// The if condition will be called only once for the first handshake message. For all other messages, the else condition will be called always.
if(isHandshakeMessage == true){
clientSocket.send(JSON.stringify(message));
}
else{
messageQueue.push(msg);
}
}
}
在服务器端,我们已经配置了 ROUTER 套接字。
上面的代码非常简单。 SendMessage() 函数实际上被调用了数千条消息,并且代码成功运行,但内存消耗负载。
问题:
因为 ZeroMQ 的行为是异步,客户端必须等待回调调用 ReceiveMessage() 每当它必须发送向 ZeroMQ ROUTER 发送一条新消息(从流向方法 Dispatch 的流程中可以明显看出)。
基于我们对 TypeScript 的有限了解以及 ZeroMQ 与 TypeScript 的使用,问题在于因为默认线程运行 typescript 代码(创建所需的 1000 多条消息并发送到 SendMessage())在发送第一条消息(本质上是握手消息)后继续执行(创建和发送更多消息),除非所有 1000 多条消息都创建并发送到 SendMessage()(不是发送数据而是发送数据)排队数据,因为我们要解释路由器套接字发送的确认消息,并且仅基于我们要发送下一条消息的确认),调用不会到达@987654330 @ 回调方法。
也就是说,只有在默认线程创建和调用SendMessage()完成1000+消息之后,调用才会到达ReceiveMessage(),现在没有其他任务了做任何进一步的事情。
由于 ZeroMQ 不提供任何使用ROUTER/DEALER 发送/接收数据的同步机制,我们必须按照上述代码使用 messageQueue 对象来使用队列。
这种机制将在内存中加载一个巨大的messageQueue(包含 1000 多条消息),并且只有在默认线程最后到达 ReceiveMessage() 调用后才会出列。如果说我们有 10000 多条甚至更多的消息要发送,情况只会变得更糟。
问题:
我们确实验证了这种行为。因此,我们确信我们在上面解释过的理解。我们对 TypeScript 或 ZeroMQ 用法的理解是否存在差距?
Typescript 中是否有任何概念,例如阻塞队列/有限大小的数组,它会在队列中获取有限的条目,并阻止任何新添加到队列中,直到现有的队列成为队列(这基本上适用于默认线程暂停其处理,直到回调
ReceiveMessage()被调用,这将从队列中取出条目)?是否有任何同步 ZeroMQ 方法(我们在类似的 C# 设置中使用它,我们在 ZeroMQ 上汇集并同步接收数据)?
在这种情况下使用多线程有什么线索吗?不确定 Typescript 是否在很大程度上支持多线程。
注意:我们在许多论坛上进行了搜索,但在任何地方都没有任何线索。以上描述可能一个问题中有多个问题(违反stackoverflow论坛规则);但对我们来说,所有这些问题都与在 Typescript 中有效使用 ZeroMQ 相关联。
期待从社区中获得一些线索。
【问题讨论】:
标签: node.js typescript zeromq blockingqueue