【问题标题】:REQ/REP & DEALER/ROUTER for two-way asynchronous worker processingREQ/REP & DEALER/ROUTER 用于双向异步工作者处理
【发布时间】:2017-06-14 03:57:43
【问题描述】:

我正在学习 ZeroMQ,并且刚刚完成了教程和一些示例。我使用 Node.js 作为我的主要环境(最终用 Python 代替了我的工人)。

试图弄清楚如何创建一个完全异步的消息传递系统,该系统将允许我的 API 将任务(通过 REQ 套接字)推送到路由器,让经销商将消息传递给一个工作人员,处理消息并将其结果发送回我的客户端(这是一个快速路由)。

我相信这个模式会像这样工作(还没有测试或正确实现代码,所以请将其作为概念大纲):


路由器.js

const zmq = require('zmq');;
const frontend = zmq.socket('router');
const backend = zmq.socket('dealer');

frontend.on('message', function() {
  var args = Array.apply(null, arguments);
  backend.send(args);
});
backend.on('message', function() {
  var args = Array.apply(null, arguments);
  frontend.send(args);
});

frontend.bindSync('tcp://*:5559');
backend.bindSync('tcp://*:5560');

client.js

var zmq = require('zmq'),
var express = require('express');
var app = express();

app.post('send', function(req, res) {
  var client = zmq.socket('req');
  // listen for responses from the server 
  client.on('message', function(data) {  
     console.log(data);
     client.close();
  }); 
  // connect to the server port 
  client.connect('tcp://0.0.0.0:5454');  
  client.send('Request from ' + process.id);
});
app.listen('80');

worker.js

var zmq = require('zmq');
var server = zmq.socket('rep');

server.on('message', function(d){  
  server.send('Response from ' + process.id); 
}); 
// bind to port 5454 
server.bind('tcp://0.0.0.0:5454', function(err){  
  if (err){ 
    console.error("something bad happened"); 
    console.error( err.msg ); 
    console.error( err.stack ); 
    process.exit(0); 
  } 
});

我不完全理解的是 ROUTER/DEALER 是否会处理将响应工作人员发送到正确的客户端。同样在这种情况下,经销商处理公平排队,因为我希望我的工作平均分配给工人。

我的客户端可以分布在许多不同的盒子(负载平衡器 API 服务器)中,我的路由器将在自己的服务器上,工作人员也将分布在多个盒子中。

【问题讨论】:

    标签: zeromq


    【解决方案1】:

    在任何生产级应用程序中忘记REQ/REP,可能会陷入相互僵局

    您可能会在 REQ/REP 正式可扩展通信模式中有关高风险相互 FSM-FSM 死锁的许多其他帖子中找到这个主题。


    请务必,XREQ/XREP == DEALER/ROUTER (自 2011 年以来已经)

    源代码消除了这背后的所有隐藏魔法,XREQ == DEALERXREP == ROUTER

    +++b/include/zmq.h
    ...
    -#define ZMQ_XREQ 5
    -#define ZMQ_XREP 6
    +#define ZMQ_DEALER 5
    +#define ZMQ_ROUTER 6
    ...
    +#define ZMQ_XREQ ZMQ_DEALER        /*  Old alias, remove in 3.x     */
    +#define ZMQ_XREP ZMQ_ROUTER        /*  Old alias, remove in 3.x     */
    

    【讨论】:

    • 那我该如何避免相互的僵局呢?您能否建议我提出的模式的替代方法?
    • 死锁是不可避免的 + 无法挽救的。永远不要使用REQ/REP
    • 好的,这很令人担忧。对客户端使用 REP-PUB 对工作人员使用 REQ-SUB 怎么样?经纪人将使用 DEALER 和 ROUTER。这有可能解决问题吗?如果不是,哪种模式在这里最有意义?
    【解决方案2】:

    对于将来阅读本文的任何人,在我进一步的研究中,我偶然发现了 Majordomo 协议/模式。这正是我想要实现的。可以在此处阅读有关实施、优缺点的文档:https://rfc.zeromq.org/spec:18/MDP/。这是代理实现:https://github.com/zeromq/majordomo

    【讨论】:

      【解决方案3】:

      似乎我在使用 DEALER/ROUTER 而应该使用 XREQXREP

      broker.js

      var zmq = require('zmq');
      var frontPort = 'tcp://127.0.0.1:5559';
      var backPort = 'tcp://127.0.0.1:5560';
      
      var frontSocket = zmq.socket('xrep');
      var backSocket = zmq.socket('xreq');
      
      frontSocket.identity = 'xrep_' + process.pid;
      backSocket.identity = 'xreq_' + process.pid;
      
      frontSocket.bind(frontPort, function (err) {
          console.log('bound', frontPort);
      });
      
      frontSocket.on('message', function() {
        //pass to back
        console.log('router: sending to server', arguments[0].toString(), arguments[2].toString());
        backSocket.send(Array.prototype.slice.call(arguments));
      });
      
      backSocket.bind(backPort, function (err) {
        console.log('bound', backPort);
      });
      
      backSocket.on('message', function() {
        //pass to front
        console.log('dealer: sending to client', arguments[0].toString(), arguments[2].toString());
        frontSocket.send(Array.prototype.slice.call(arguments));
      });
      
      console.log('Broker started...');
      

      worker.js

      var zmq = require('zmq');
      var socket = zmq.socket('rep');
      
      socket.identity = 'worker_' + process.pid;
      
      socket.on('message', function(data) {
          console.log(socket.identity + ': received ' + data.toString());
          socket.send(data * 2);
      });
      socket.connect('tcp://127.0.0.1:5560', function(err) {
        if (err) throw err;
        console.log('server connected!');
      });
      
      console.log('Worker started...');
      

      client.js

      var zmq = require('zmq');
      var socket = zmq.socket('req');
      
      socket.identity = 'client_' + process.pid;
      
      socket.on('message', function(data) {
        console.log(socket.identity + ': answer data ' + data);
      });
      socket.connect('tcp://127.0.0.1:5559');
      
      setInterval(function() {
        var value = Math.floor(Math.random()*100);
        console.log(socket.identity + ': asking ' + value);
        socket.send(value);
      }, 100);
      
      console.log('Client started...');
      

      我仍然不确定在每个 API 入站请求上打开连接是否安全。

      【讨论】:

      • 在每次调用时设置 + 拆除 ZeroMQ 消息传递/信令基础设施总是一个坏主意。 ZeroMQ 到目前为止还不是消耗品/一次性用品,在所有基础设施组件达到 RTO 状态的端到端之前,每次调用都会破坏大量系统开销 + 等待额外的延迟需要付出很多代价。即使是教科书的例子也是个坏主意。永远不要宣扬这种动机不良的编程风格。
      • 那是我的确切想法和担忧。那么问题是我将如何映射对套接字的请求,以便正确的路由接收响应消息?
      猜你喜欢
      • 2014-01-03
      • 1970-01-01
      • 2014-03-05
      • 2019-12-01
      • 2023-03-22
      • 2021-11-19
      • 1970-01-01
      • 2021-03-29
      • 1970-01-01
      相关资源
      最近更新 更多