【问题标题】:Node.js + socket.io + node-amqp and queue binginds when "re" connecting thru socket.ioNode.js + socket.io + node-amqp 并在通过 socket.io “重新”连接时排队
【发布时间】:2011-11-27 08:49:21
【问题描述】:

我有一个非常接近这个示例的场景:

一个主屏幕:

  • 此屏幕(客户端)将通过 server:9090/scope (io.connect("http://server:9090/scope)) 连接到 socket.io 服务器,并将发送一个事件“userBindOk” " (socket.emit("userBindOk", message)) 到socket.io服务器;

  • 服务器接收连接和“userBindOk”。此时,服务器应该获得到rabbitmq服务器的活动连接,并将队列绑定到刚刚通过socket.io连接到应用程序的相应用户。示例:

    socket.on("连接", function(client){ //客户端ID为1234 // 绑定 rabbitmq 交换、队列和: queue.subscribe(//接收回调); })

  • 到目前为止,没问题 - 我可以毫无问题地通过 socket.io 发送/接收消息。

  • 但是,如果我刷新页面,所有这些步骤都将再次完成。结果,将发生与队列的绑定,但这一次与 socket.io 客户端的另一个会话有关。这意味着如果我向与第一个 socket.io 会话相关的队列发送消息(在页面刷新之前),则该绑定应该(我认为)接收消息并将其发送到无效的 socket.io 客户端(页面在 socket.io 上下文中刷新 = new client.id)。我可以证明这种行为,因为每次刷新页面时,我都需要发送 x 倍的消息。例如:我第一次连接: - 所以,1 条消息 - 一个屏幕更新;刷新页面:我需要向队列发送 2 条消息,并且只会从“实际”socket.io 客户端会话接收到第二条消息 - 这种行为会在我刷新页面时发生(20 条页面刷新,20 条消息发送到队列,服务器socket.io“last”客户端将消息发送到客户端socket.io渲染到屏幕上)。

我认为的解决方案是:

  • 在从 socket.io 服务器断开连接时找到一种“解除绑定”队列的方法 - 我在 node-amqp api 上还没有看到这个选项(等待它:D)

  • 找到一种使用相同 client.id 重新连接 socket.io 客户端的方法。这样我就可以识别即将到来的客户端并应用一些逻辑来缓存套接字。

有什么想法吗?我试图说得很清楚...但是,正如您所知,在尝试澄清某些特定于某些上下文的问题时,暴露您的问题并不是那么容易...

tks

【问题讨论】:

    标签: rabbitmq socket.io amqp


    【解决方案1】:

    我是这样解决的:

    我曾经将rabbitMq队列声明为durable=true,autoDelete=false,exclusive=false,并且在我的应用程序中有1个队列/用户和1个交换(类型=直接),routing_key name=queueName,我的应用程序也是使用与 android 应用程序或 iphone 应用程序等浏览器不同的其他客户端的队列作为推送后备,因此我使用为每个用户创建 1 个队列。

    解决这个问题的方法是改变我的 rabbitMQ 队列和交换声明。现在我将交换/用户声明为 fanout 和 autoDelete=True,并且用户将有 N 个队列,其中持久性 =true、autoDelete=true、exclusive=true(No.queue = No.clients)并且所有队列都是绑定的到用户交换(多播)。

    注意:我的应用程序是在 django 中编写的,我使用 node+socket+amqp 能够使用 web.scokets 与浏览器进行通信,所以我使用 node-restler 来查询我的应用程序 api 以获取用户队列信息。

    这就是 rabbitMQ 方面,对于 node+amqp+socket 我这样做了:

    服务器端:

    • onConnect:声明用户交换为扇出、自动删除、持久。然后将队列声明为持久、自动删除和独占,然后 queue.bind 到用户交换,最后 queue.subscribe 和 socket.disconnect 将销毁队列,因此当客户端连接应用程序和这解决了刷新问题并允许用户在应用中拥有多个窗口选项卡:

    服务器端:

                /*
                 * unCaught exception handler
                 */
    
                process.on('uncaughtException', function (err) {
                    sys.p('Caught exception: ' + err);
                    global.connection.end();
                });
    
    
                /*
                 * Requiere libraries
                 */
    
                global.sys =  require('sys');
                global.amqp = require('amqp');
                var rest = require('restler');
                var io = require('socket.io').listen(8080);
    
                /*
                 * Module global variables
                 */
                global.amqpReady = 0;
    
    
                /*
                 * RabbitMQ connection
                 */
    
                global.connection = global.amqp.createConnection({
                                 host: host,
                                 login: adminuser,
                                 password: adminpassword,
                                 vhost: vhost
                                });
    
                global.connection.addListener('ready', 
                            function () {
                                sys.p("RabbitMQ connection stablished");
                                global.amqpReady = 1;
                            }
                );
    
    
                /*
                 * Web-Socket declaration
                 */ 
    
                io.sockets.on('connection', function (socket) {
                    socket.on('message', function (data) {
                        sys.p(data);
                        try{
                            var message = JSON.parse(data);                 
                        }catch(error){
                            socket.emit("message", JSON.stringify({"error": "invalid_params", "code": 400}));
                            var message = {};
                        }           
                        var message = JSON.parse(data);
                        if(message.token != undefined) {
    
                          rest.get("http://dev.kinkajougames.com/api/push",
                                    {headers: 
                                        {
                                            "x-geochat-auth-token": message.token 
                                        }
                                    }).on('complete', 
                                        function(data) {
                                            a = data;
                                    }).on('success',
                                        function (data){
                                            sys.p(data);
                                            try{                                
                                                sys.p("---- creating exchange");
                                                socket.exchange = global.connection.exchange(data.data.bind, {type: 'fanout', durable: true, autoDelete: true});
                                                sys.p("---- declarando queue");
                                                socket.q = global.connection.queue(data.data.queue, {durable: true, autoDelete: true, exclusive: false},
                                                    function (){
                                                        sys.p("---- bind queue to exchange");
                                                        //socket.q.bind(socket.exchange, "*");
                                                        socket.q.bind(socket.exchange, "*");
                                                        sys.p("---- subscribing queue exchange");
                                                        socket.q.subscribe(function (message) {
                                                            socket.emit("message", message.data.toString());
                                                        });     
                                                    }
                                                );
                                            }catch(err){
                                                sys.p("Imposible to connection to rabbitMQ-server");
                                            }                                   
    
                                    }).on('error', function (data){
                                        a = {
                                            data: data,
                                        };
                                    }).on('400', function() {
                                        socket.emit("message", JSON.stringify({"error": "connection_error", "code": 400}));
                                    }).on('401', function() {
                                        socket.emit("message", JSON.stringify({"error": "invalid_token", "code": 401}));
                                    });               
                        }
                        else {
                          socket.emit("message", JSON.stringify({"error": "invalid_token", "code": 401}));
                        }
    
                    });
                    socket.on('disconnect', function () {
                        socket.q.destroy(); 
                        sys.p("closing socket");
                    });
                });
    

    客户端:

    • 带有选项“强制新连接”=true 和“卸载时同步断开连接”=false 的套接字实例。
    • 客户端使用onbeforeunload和onunload windows对象事件发送socket.disconnect
    • socket.connect 事件中的客户端将用户令牌发送到节点。
    • 处理来自套接字的消息

              var socket;
              function webSocket(){
                  //var socket = new io.Socket();
                  socket = io.connect("ws.dev.kinkajougames.com", {'force new connection':true, 'sync disconnect on unload': false});
                  //socket.connect();
      
                  onSocketConnect = function(){
                      alert('Connected');
                      socket.send(JSON.stringify({
                          token: Get_Cookie('liveScoopToken')
                      }));
                  };
      
                  socket.on('connect', onSocketConnect);
                  socket.on('message', function(data){
                      message = JSON.parse(data);
                      if (message.action == "chat") {
                          if (idList[message.data.sender] != undefined) {
                              chatboxManager.dispatch(message.data.sender, {
                                  first_name: message.data.sender
                              }, message.data.message);
                          }
                          else {
                              var username = message.data.sender;
                              Data.Collections.Chats.add({
                                  id: username,
                                  title: username,
                                  user: username,
                                  desc: "Chat",
                                  first_name: username,
                                  last_name: ""
                              });
                              idList[message.data.sender] = message.data.sender;
                              chatboxManager.addBox(message.data.sender, {
                                  title: username,
                                  user: username,
                                  desc: "Chat",
                                  first_name: username,
                                  last_name: "",
                                  boxClosed: function(id){
                                      alert("closing");
                                  }
                              });
                              chatboxManager.dispatch(message.data.sender, {
                                  first_name: message.data.sender
                              }, message.data.message);
                          }
                      }
                  });
              }                           
      
              webSocket();
      
              window.onbeforeunload = function() {
                  return "You have made unsaved changes. Would you still like to leave this page?";
              }
      
              window.onunload = function (){
                  socket.disconnect();
              }
      

    就是这样,所以不要再循环消息了。

    【讨论】:

      猜你喜欢
      • 2015-04-03
      • 1970-01-01
      • 2013-08-20
      • 2012-12-27
      • 1970-01-01
      • 1970-01-01
      • 2018-05-14
      • 1970-01-01
      • 2011-11-22
      相关资源
      最近更新 更多