【发布时间】:2022-03-06 02:13:30
【问题描述】:
我有一个使用 mongo changestream 监听器的 nodejs 集群服务器,通过 socket.io 向客户端发送数据。我正在使用 Redis 将所有已连接用户的 userId 和 socketId 存储在哈希中。
{ userId: 'aaa', socketId: 'bbb' }
用于存储这些数据的redis客户端在master进程中初始化。 mongo 变更流是在主进程中创建的。
当变更流看到一个新文档时,它会将文档作为消息发送给子进程。当子进程收到消息时,它可以从文档中检索 userId。有了userId,就可以从redis中获取客户端连接的socketId了。
我遇到的问题是在从 redis 检索到 socketId 后尝试使用 socketId 发出消息。 我正在创建一个包含 socketId 的 sockethandler 对象。当我使用此 socketId 发出套接字消息时,如下所示:
io.sockets.to(userSocketId)
.emit("confirmOrder", "Your order is being processed!")
我收到一个错误:
(node:31804) UnhandledPromiseRejectionWarning: 错误: 客户端已关闭 在新的 ClientClosedError (/Users/a999999999/code/*/node_modules/@node-redis/client/dist/lib/errors.js:24:9)
错误来自 redis,并且源自上面写的套接字发射行。 ^^
这里有更多来自工作进程的代码:
const pubClient = createClient({ host: "127.0.0.1", port: 6379 }),
subClient = pubClient.duplicate();
io.adapter(createAdapter(pubClient, subClient));
setupWorker(io);
io.on("connection", (socket) => {
const socketId = socket.id;
socket.emit("connection", "SERVER: you are connected");
socket.on("userConnect", (user) => {
let { userId } = user;
userConnectClient
.HSET(userId, { userId, socketId })
.catch((err) => console.log("ERROR: ", err));
});
});
process.on("message", async ({ type, data }) => {
switch (type) {
case "dispatch:order":
let { order } = JSON.parse(data);
const socketsHandler = await createSocketsHandler(order);
const userSocketId = socketsHandler.user.socketId;
io.sockets
.to(userSocketId)
.emit("confirmOrder", "Your order is being processed!");
break;
}
});
async function createSocketsHandler(order) {
let { userId } = order;
let user = await userConnectClient
.HGETALL(userId)
.catch((err) => console.log(err));
return {
user: user,
};
}
此时我暂时被难住了。目前正在试验io对象,并试图找到更好的工具来监控redis。任何帮助/问题表示赞赏!谢谢!
【问题讨论】:
标签: node.js sockets asynchronous async-await redis