【问题标题】:Websocket transport reliability (Socket.io data loss during reconnection)Websocket 传输可靠性(重新连接期间的 Socket.io 数据丢失)
【发布时间】:2014-01-08 05:29:29
【问题描述】:

二手

NodeJS、Socket.io

问题

假设有 2 个用户 U1U2,通过 Socket.io 连接到一个应用程序。算法如下:

  1. U1 完全失去互联网连接(例如关闭互联网)
  2. U2U1 发送消息。
  3. U1 尚未收到消息,因为 Internet 已关闭
  4. 服务器检测U1通过心跳超时断开连接
  5. U1 重新连接到 socket.io
  6. U1 从未收到来自 U2 的消息 - 我猜它在第 4 步中丢失了。

可能的解释

我想我明白为什么会这样:

  • 在第 4 步中,Server 将终止套接字实例和发送到 U1 的消息队列
  • 此外,在第 5 步 U1Server 会创建新连接(它不会被重复使用),因此即使消息仍在排队,之前的连接也会丢失。

需要帮助

如何防止此类数据丢失?我必须使用心跳,因为我不会让人们永远挂在应用程序中。此外,我仍然必须提供重新连接的可能性,因为当我部署新版本的应用程序时,我希望零停机时间。

附:我称之为“消息”的东西不仅仅是我可以存储在数据库中的文本消息,而是有价值的系统消息,必须保证传递,否则 UI 会搞砸。

谢谢!


加法1

我已经有一个用户帐户系统。而且,我的应用程序已经很复杂了。添加离线/在线状态无济于事,因为我已经有了这种东西。问题是不同的。

查看第 2 步。在这一步中,从技术上讲,我们不能说 U1 是否离线,他只是失去连接让我们说 2 秒钟,可能是因为网络不好。所以 U2 向他发送了一条消息,但 U1 没有收到它,因为他的互联网仍然关闭(第 3 步)。需要第 4 步来检测离线用户,假设超时为 60 秒。最终,再过 10 秒,U1 的互联网连接就建立起来了,他重新连接到了 socket.io。但是来自 U2 的消息在空间中丢失了,因为服务器 U1 因超时而断开连接。

这就是问题所在,我不想 100% 交付。


解决方案

  1. 在 {} 用户中收集发射(发射名称和数据),由随机发射 ID 标识。发送发射
  2. 在客户端确认发射(使用 emitID 将发射发送回服务器)
  3. 如果确认 - 从由 emitID 标识的 {} 中删除对象
  4. 如果用户重新连接 - 为该用户检查 {} 并循环遍历它,为 {} 中的每个对象执行第 1 步
  5. 当断开连接或/和连接时,如有必要,为用户刷新 {}
// Server
const pendingEmits = {};

socket.on('reconnection', () => resendAllPendingLimits);
socket.on('confirm', (emitID) => { delete(pendingEmits[emitID]); });

// Client
socket.on('something', () => {
    socket.emit('confirm', emitID);
});

解决方案 2(有点)

于 2020 年 2 月 1 日添加。

虽然这不是 Websockets 的真正解决方案,但有人可能仍然觉得它很方便。我们从 Websockets 迁移到 SSE + Ajax。 SSE 允许您从客户端连接以保持持久的 TCP 连接并实时接收来自服务器的消息。要将消息从客户端发送到服务器 - 只需使用 Ajax。存在延迟和开销等缺点,但 SSE 保证可靠性,因为它是 TCP 连接。

由于我们使用 Express,因此我们将此库用于 SSE https://github.com/dpskvn/express-sse,但您可以选择适合您的库。

IE 和大多数 Edge 版本不支持 SSE,因此您需要一个 polyfill:https://github.com/Yaffle/EventSource

【问题讨论】:

  • 确实如此。但是 socket.io 实际上只是一个传输协议。它本身并不能保证一致和可靠的消息传递。您应该查看(并阅读)发布-订阅(发布-订阅)架构和消息队列。在实践中,您将使用像 redis 这样的持久性数据库来存储消息。
  • 那么 pubsub 会解决这个问题吗?如果您写了一个全面的答案并且解决方案有效,您将获得奖励(50 分)。
  • 如此精美的问题
  • 谢谢。我必须说接受的答案对我有用。我目前使用建议的方案,没有问题。
  • 嗨,伊戈尔!我是 Node.js 和 Socket.io 的新手。如果可能的话,你能显示你的代码吗:)

标签: node.js websocket socket.io sse eventsource


【解决方案1】:

其他人在其他答案和 cmets 中暗示了这一点,但根本问题是 Socket.IO 只是一种交付机制,您不能单独依赖它来实现可靠的交付。唯一能确定消息已成功传递给客户的人是客户自己。对于这种系统,我建议做出以下断言:

  1. 消息不直接发送给客户端;相反,它们会被发送到服务器并存储在某种数据存储中。
  2. 客户端负责在重新连接时询问“我错过了什么”,并将查询数据存储中存储的消息以更新其状态。
  3. 如果在接收方客户端连接时将消息发送到服务器,该消息将实时发送到客户端。

当然,根据您的应用程序的需要,您可以调整其中的一部分——例如,您可以使用 Redis 列表或排序集来处理消息,如果您知道一个事实,则可以清除它们客户端是最新的。


这里有几个例子:

幸福之路

  • U1 和 U2 都连接到系统。
  • U2 向服务器发送一条 U1 应该接收的消息。
  • 服务器将消息存储在某种持久性存储中,用某种时间戳或顺序 ID 为 U1 标记它。
  • 服务器通过Socket.IO向U1发送消息。
  • U1 的客户端确认(可能通过 Socket.IO 回调)它收到了消息。
  • 服务器从数据存储中删除持久化消息。

离线路径

  • U1 失去互联网连接。
  • U2 向服务器发送一条 U1 应该接收的消息。
  • 服务器将消息存储在某种持久性存储中,用某种时间戳或顺序 ID 为 U1 标记它。
  • 服务器通过Socket.IO向U1发送消息。
  • U1 的客户端没有确认收货,因为他们处于离线状态。
  • 也许 U2 向 U1 发送了更多消息;它们都以相同的方式存储在数据存储中。
  • 当 U1 重新连接时,它会询问服务器“我看到的最后一条消息是 X / 我有状态 X,我错过了什么。”
  • 服务器根据 U1 的请求向 U1 发送它从数据存储中遗漏的所有消息
  • U1 的客户端确认收到,服务器从数据存储中删除这些消息。

如果您绝对希望有保证的交付,那么重要的是设计您的系统,使连接实际上并不重要,实时交付只是一个奖励;这几乎总是涉及某种数据存储。正如 user568109 在评论中提到的那样,有一些消息传递系统可以抽象出所述消息的存储和传递,并且可能值得研究这种预构建的解决方案。 (您可能仍然需要自己编写 Socket.IO 集成。)

如果您对将消息存储在数据库中不感兴趣,则可以将它们存储在本地数组中;服务器尝试向 U1 发送消息,并将其存储在“待处理消息”列表中,直到 U1 的客户端确认它收到它。如果客户端离线,那么当它回来时,它可以告诉服务器“嘿,我已断开连接,请将我错过的任何内容发送给我”,然后服务器可以遍历这些消息。

幸运的是,Socket.IO 提供了一种机制,允许客户端“响应”类似于原生 JS 回调的消息。这是一些伪代码:

// server
pendingMessagesForSocket = [];

function sendMessage(message) {
  pendingMessagesForSocket.push(message);
  socket.emit('message', message, function() {
    pendingMessagesForSocket.remove(message);
  }
};

socket.on('reconnection', function(lastKnownMessage) {
  // you may want to make sure you resend them in order, or one at a time, etc.
  for (message in pendingMessagesForSocket since lastKnownMessage) {
    socket.emit('message', message, function() {
      pendingMessagesForSocket.remove(message);
    }
  }
});

// client
socket.on('connection', function() {
  if (previouslyConnected) {
    socket.emit('reconnection', lastKnownMessage);
  } else {
    // first connection; any further connections means we disconnected
    previouslyConnected = true;
  }
});

socket.on('message', function(data, callback) {
  // Do something with `data`
  lastKnownMessage = data;
  callback(); // confirm we received the message
});

这与上一个建议非常相似,只是没有持久数据存储。


您可能还对event sourcing 的概念感兴趣。

【讨论】:

  • 我已经等待最终的综合答复,并附上一份声明,即客户必须确认交货。看来真的没有别的办法了。
  • 很高兴有帮助! Give me a ping如果您有任何问题。
  • 这将适用于一对一的聊天场景。房间示例中发生的情况,其中消息发送给多个用户。广播/socket.in 不支持回调。那么我们如何处理这种情况呢?我的问题。 (stackoverflow.com/questions/43186636/…)
  • @MichelleTilley 嗨,我知道已经有一段时间了,但您的回答很中肯,非常感谢您的澄清。我已经成功实现了 socket.io,但是如果用户 1 在聊天之外并且用户 2 在聊天内,并且用户 2 向用户 1 发送了一个味精,用户 1 在从数据库中获取它之前不会知道味精,我只有 1 个问题或者如果 user1 也在聊天中。 stackoverflow.com/questions/65682609/…你能看看这个问题吗,真的很感激
【解决方案2】:

Michelle 的回答非常中肯,但还有一些其他重要的事情需要考虑。要问自己的主要问题是:“我的应用程序中的用户和套接字之间有区别吗?”另一种询问方式是“每个登录用户一次可以有多个套接字连接吗?”

在网络世界中,单个用户可能总是有多个套接字连接,除非您专门放置了一些东西来防止这种情况发生。最简单的示例是,如果用户打开了同一页面的两个选项卡。在这些情况下,您不必只向人类用户发送一次消息/事件......您需要将其发送到该用户的每个套接字实例,以便每个选项卡都可以运行它的回调来更新 ui 状态。也许这对某些应用程序来说不是问题,但我的直觉说这对大多数应用程序来说都是如此。如果您对此感到担忧,请继续阅读......

要解决这个问题(假设您使用数据库作为持久存储),您需要 3 个表。

  1. 用户 - 与真人一对一
  2. clients - 代表一个“选项卡”,可以与套接字服务器建立单一连接。 (任何“用户”可能有多个)
  3. 消息 - 需要发送给客户端的消息(不是需要发送给用户或套接字的消息)

如果您的应用不需要,用户表是可选的,但 OP 说他们有。

需要正确定义的另一件事是“什么是套接字连接?”、“何时创建套接字连接?”、“何时重用套接字连接?”。 Michelle 的伪代码使套接字连接看起来可以重用。使用 Socket.IO,它们不能被重用。我已经看到了很多混乱的根源。在现实生活中,米歇尔的例子确实有意义。但我不得不想象这些情况很少见。真正发生的是当套接字连接丢失时,该连接、ID 等将永远不会被重用。因此,任何专门为该套接字标记的消息永远不会传递给任何人,因为当最初连接的客户端重新连接时,他们会获得一个全新的连接和新的 ID。这意味着您可以通过多个套接字连接来跟踪客户端(而不是套接字或用户)。

因此,对于基于 Web 的示例,这里将是我推荐的一组步骤:

  • 当用户加载有可能创建套接字连接的客户端(通常是单个网页)时,将一行添加到链接到其用户 ID 的客户端数据库中。
  • 当用户真正连接到套接字服务器时,将客户端 ID 与连接请求一起传递给服务器。
  • 服务器应验证用户是否被允许连接,并且客户端表中的客户端行可用于连接,并相应地允许/拒绝。
  • 使用 Socket.IO 生成的套接字 ID 更新客户端行。
  • 发送连接到客户端 ID 的消息表中的任何项目。初始连接时不会有任何连接,但如果这是来自尝试重新连接的客户端,可能会有一些连接。
  • 任何时候需要向该套接字发送消息时,请在消息表中添加一行,该行与您生成的客户端 ID(而不是套接字 ID)相关联。
  • 尝试发出消息并通过确认侦听客户端。
  • 收到确认后,从消息表中删除该项目。
  • 您可能希望在客户端创建一些逻辑来丢弃从服务器发送的重复消息,因为正如某些人所指出的那样,这在技术上是一种可能性。
  • 那么当客户端从套接字服务器断开连接时(故意或通过错误),不要删除客户端行,最多只清除套接字ID。这是因为同一个客户端可能会尝试重新连接。
  • 当客户端尝试重新连接时,发送与原始连接尝试相同的客户端 ID。服务器会将其视为初始连接。
  • 当客户端被销毁(用户关闭选项卡或导航离开)时,即删除客户端行和该客户端的所有消息。这一步可能有点棘手。

因为最后一步很棘手(至少以前是这样,我已经很长时间没有做过类似的事情了),并且因为在某些情况下,例如掉电,客户端会在不清理客户端行的情况下断开连接并且永远不要尝试与同一客户端行重新连接 - 您可能希望定期运行以清理任何过时的客户端和消息行。或者,您可以永久存储所有客户端和消息,并适当地标记它们的状态。

所以要明确一点,如果一个用户打开了两个选项卡,您将向消息表中添加两条相同的消息,每条消息都标记为不同的客户端,因为您的服务器需要知道每个客户端是否收到它们,而不仅仅是每个用户。

【讨论】:

    【解决方案3】:

    正如在另一个答案中已经写的那样,我也相信您应该将实时视为奖励:系统也应该能够在没有实时的情况下工作。

    我正在为一家大公司(ios、android、web 前端和 .net core + postGres 后端)开发企业聊天,并且在开发了一种让 websocket 重新建立连接的方法(通过套接字 uuid)和获取未传递的消息(存储在队列中) 我知道有一个更好的解决方案:通过 rest API 重新同步。

    基本上我最终只使用了实时 websocket,在每条实时消息(用户在线、打字机、聊天消息等)上都有一个整数标签,用于监控丢失的消息。

    当客户端获得一个非单体 (+1) 的 id 时,它会理解它不同步,因此它会丢弃所有套接字消息并通过 REST api 要求其所有观察者重新同步。

    这样我们可以在离线期间处理应用程序状态的许多变化,而不必在重新连接时连续解析大量的 websocket 消息,并且我们肯定会被同步(因为最后一个同步日期是由REST api,而不是来自套接字)。

    唯一棘手的部分是从调用 REST api 到服务器回复的那一刻监控实时消息,因为从 db 读取的内容需要时间才能返回到客户端,同时可能会发生变化,因此他们需要被缓存并考虑在内。

    我们将在几个月后投入生产, 我希望到那时能回去睡觉:)

    【讨论】:

      【解决方案4】:

      您似乎已经拥有用户帐户系统。知道哪个账号在线/离线,就可以处理connect/disconnect事件了:

      所以解决方案是,在数据库中为每个用户添加在线/离线和离线消息:

      chatApp.onLogin(function (user) {
         user.readOfflineMessage(function (msgs) {
             user.sendOfflineMessage(msgs, function (err) {
                 if (!err) user.clearOfflineMessage();
             });
         })
      });
      
      chatApp.onMessage(function (fromUser, toUser, msg) {
         if (user.isOnline()) {
            toUser.sendMessage(msg, function (err) {
                // alert CAN NOT SEND, RETRY?
            });
         } else {
            toUser.addToOfflineQueue(msg);
         }
      })
      

      【讨论】:

      • 请阅读我的问题中的“附加 1”部分。我不认为你的答案是一个解决方案。
      • 这很有趣,我现在开始我自己的聊天项目,也许使用网络 RTC :->
      • 我也在 WebRTC 上。但在这种情况下,这并不重要。啊......如果所有人都有稳定的互联网......当用户在 Speedtest 上拥有 100Mbps 时我很沮丧,但实际上如果他们尝试 ping 他们有 20% 的数据包丢失。谁需要这样的互联网? =)
      【解决方案5】:

      看这里:Handle browser reload socket.io

      我认为您可以使用我提出的解决方案。如果你修改得当,它应该可以按你的意愿工作。

      【讨论】:

      • 这很有趣,我找不到这个问题,但搜索了几个小时。会看的!
      • 我好像已经在用这种架构了。它不能解决我描述的确切问题。
      【解决方案6】:

      我认为您想要的是为每个用户提供一个可重复使用的套接字,例如:

      客户:

      socket.on("msg", function(){
          socket.send("msg-conf");
      });
      

      服务器:

      // Add this socket property to all users, with your existing user system
      user.socket = {
          messages:[],
          io:null
      }
      user.send = function(msg){ // Call this method to send a message
          if(this.socket.io){ // this.io will be set to null when dissconnected
              // Wait For Confirmation that message was sent.
              var hasconf = false;
              this.socket.io.on("msg-conf", function(data){
                  // Expect the client to emit "msg-conf"
                  hasconf = true;
              });
              // send the message
              this.socket.io.send("msg", msg); // if connected, call socket.io's send method
              setTimeout(function(){
                  if(!hasconf){
                      this.socket = null; // If the client did not respond, mark them as offline.
                      this.socket.messages.push(msg); // Add it to the queue
                  }
              }, 60 * 1000); // Make sure this is the same as your timeout.
      
          } else {
              this.socket.messages.push(msg); // Otherwise, it's offline. Add it to the message queue
          }
      }
      user.flush = function(){ // Call this when user comes back online
          for(var msg in this.socket.messages){ // For every message in the queue, send it.
              this.send(msg);
          }
      }
      // Make Sure this runs whenever the user gets logged in/comes online
      user.onconnect = function(socket){
          this.socket.io = socket; // Set the socket.io socket
          this.flush(); // Send all messages that are waiting
      }
      // Make sure this is called when the user disconnects/logs out
      user.disconnect = function(){
          self.socket.io = null; // Set the socket to null, so any messages are queued not send.
      }
      

      然后在断开连接之间保留套接字队列。

      确保将每个用户的socket 属性保存到数据库中,并使方法成为用户原型的一部分。数据库无关紧要,只需保存它,但您一直在保存您的用户。

      这将通过在将消息标记为已发送之前要求客户端确认来避免 Additon 1 中提到的问题。如果你真的想的话,你可以给每条消息一个id,然后让客户端把消息id发给msg-conf,然后检查一下。

      在本例中,user 是模板用户,所有用户都从中复制,或类似于用户原型。

      注意:这未经测试。

      【讨论】:

      • 你能告诉我真正的“用户”变量是什么吗?
      • 实际上,我认为您解决了我的问题。但是您能否也为每段代码提供一些 cmets ?我还不明白如何将它集成到我的代码中。另外我应该在哪里将它保存到数据库中,你的意思是哪种数据库? Redis 或可能是 Mongo 或无关紧要?
      • 还是没有解决问题。发送消息时,两个用户(发送者和接收者)对于服务器都是在线的。请仔细阅读我的问题中的附加 1。在这种情况下,“this.socket.io”将始终为“true”,因此正在发送消息,但未收到消息。您尝试解决问题,当 SENDER 下线时,而不是 RECEIVER。还是我不对?
      • @igorpavlov,对不起,你误会了我。想象一下:U1 想向 U2 发送一条消息“Hi”:users.getUserByName("U2").send("Hi")。那么如果U2在线,U2的socket.io就不会为空,所以会发送消息。如果 U2 的 socket 为 null,那么它将排队等待 U2 上线。
      • 我相信@igorpavlov 是对的。会有一段时间客户端实际断开连接,但是服务器不知道,因为心跳还没有发生。在这段时间内,this.socket.ionull,服务器将尝试传递消息。
      【解决方案7】:

      最近一直在看这些东西,并认为不同的路径可能会更好。

      尝试查看 Azure 服务总线,查询和主题处理离线状态。 消息等待用户回来,然后他们收到消息。

      运行队列是一项成本,但对于基本队列而言,每百万次操作需要 0.05 美元,因此开发成本将更多来自编写队列系统所需的工作时间。 https://azure.microsoft.com/en-us/pricing/details/service-bus/

      azure bus 有 PHP、C#、Xarmin、Anjular、Java Script 等的库和示例。

      所以服务器发送消息并且不需要担心跟踪它们。 客户端可以使用消息发回,也可以在需要时处理负载平衡。

      【讨论】:

      • 在我看来,它就像是植入式广告。有人可能会觉得这很有帮助,但这甚至不是一项技术,而是一项整体服务,也是有偿的。
      【解决方案8】:

      试试这个发出聊天列表

      io.on('connect', onConnect);
      
      function onConnect(socket){
      
        // sending to the client
        socket.emit('hello', 'can you hear me?', 1, 2, 'abc');
      
        // sending to all clients except sender
        socket.broadcast.emit('broadcast', 'hello friends!');
      
        // sending to all clients in 'game' room except sender
        socket.to('game').emit('nice game', "let's play a game");
      
        // sending to all clients in 'game1' and/or in 'game2' room, except sender
        socket.to('game1').to('game2').emit('nice game', "let's play a game (too)");
      
        // sending to all clients in 'game' room, including sender
        io.in('game').emit('big-announcement', 'the game will start soon');
      
        // sending to all clients in namespace 'myNamespace', including sender
        io.of('myNamespace').emit('bigger-announcement', 'the tournament will start soon');
      
        // sending to individual socketid (private message)
        socket.to(<socketid>).emit('hey', 'I just met you');
      
        // sending with acknowledgement
        socket.emit('question', 'do you think so?', function (answer) {});
      
        // sending without compression
        socket.compress(false).emit('uncompressed', "that's rough");
      
        // sending a message that might be dropped if the client is not ready to receive messages
        socket.volatile.emit('maybe', 'do you really need it?');
      
        // sending to all clients on this node (when using multiple nodes)
        io.local.emit('hi', 'my lovely babies');
      
      };

      【讨论】:

        猜你喜欢
        • 2022-08-23
        • 2021-05-24
        • 2017-02-12
        • 1970-01-01
        • 1970-01-01
        • 2017-09-08
        • 2015-07-21
        • 1970-01-01
        • 2017-08-18
        相关资源
        最近更新 更多