【问题标题】:zeromq message pattern for many to many request/reply多对多请求/回复的 zeromq 消息模式
【发布时间】:2013-07-31 06:22:43
【问题描述】:

我对 ZeroMQ 很陌生。我已阅读指南,目前正在浏览示例以及查看网络上的其他相关信息。我对使用哪种消息模式或是否应该使用 2 种模式的组合犹豫不决。

我有一个现有的软件应用程序,其中包含一个需要更换的自制消息传递系统。我有一个相当简单的架构:

|Client|<----->|driver1|
           |
           |---|driverN|

目前只有一个“客户端”连接到一个驱动程序,并且可能有多个驱动程序。

(实际上,在这种情况下,客户端并不是真正的我的客户端应用程序,而是某种中间人。在本次讨论中,它可以被视为客户端)

消息:

  1. 客户端向驱动程序发出命令。
  2. 驱动程序返回状态/状态信息以响应命令。
  3. 驱动程序产生数据元素(即非状态/状态信息)
  4. 一些客户端消息发送到所有连接的设备,一些只定向到单个驱动程序。

驱动程序可能存在于同一系统中,也可能存在于局域网中。这不是公共网络。

我目前正在考虑在每个驱动程序上都有一个 pub 和 sub 套接字,在客户端上有一个 sub/pub 套接字。建立连接后不应丢弃消息。我假设客户端会订阅不同的驱动程序数据类型,然后驱动程序会订阅客户端的命令消息。

重要注意事项:低延迟、尽可能低的带宽开销。

如果有任何建议或建议,我将不胜感激!提前致谢!

【问题讨论】:

  • 我在1、2、4和你在一起,但在3上输了;驱动程序产生数据元素,好的,但是需要什么消息交换?此外,在 4 上,客户端是否决定消息是发送到单个设备还是所有设备?客户端是 ZeroMq 客户端吗?
  • 对于3,产生的数据是响应客户端发送的命令。示例:客户端发送“拍摄静止图像”命令,然后驱动程序返回图像。对于 4,客户端决定与哪些设备通信。客户端将有一个 zeroMQ 组件。希望有帮助!
  • 从阅读zguide.zeromq.org/… 开始,您需要该背景来理解请求/回复语义和封装。我建议也玩这些例子,这是最好的学习方式。明天我会针对这个问题发布一些代码示例。
  • 我已阅读指南。两次。我担心我的特定系统中 REQ/REP 的额外开销。我一直在浏览示例等...无法构建一些示例,但我了解其中大多数示例的一般概念。感谢您的反馈和即将发布的代码示例!
  • 您使用什么语言绑定?顺便说一句,客户端是否可以使用逻辑名称来寻址设备,例如printer1scanner5

标签: zeromq


【解决方案1】:

你选择了一个很棒的学习练习,这是肯定的!

阅读这些内容,它们提供了使用带有轮询的自定义路由器到路由器代理的请求/回复的基本实现,并且应该解决您的客户端到设备问题。

解决方案是同步的,因此从客户端发送的任何请求都会阻塞,直到它得到响应。就我个人而言,我会为请求和回复使用异步以获得完全的灵活性,但这种解决方案要复杂得多。但是,本书中有名为 FreelanceDealer/Router 的示例说明了异步请求/回复。

这是一个同步多对多请求/回复的示例。你必须知道 ZeroMq 包络是如何工作的,才能完全理解这种方法的机制;见例子lbbroker1

客户

setIdentity()设置客户端身份;对响应路由很重要。
客户端循环发送device1device2等请求;如果设备存在,则从特定设备返回状态消息,否则返回“无设备”给客户端。

    Socket client = context.socket(ZMQ.REQ);
    client.setIdentity("client1".getBytes());
    client.connect("tcp://localhost:5550");

    for( int i = 0; i < 5; i++){
       client.send("device" + i);   
       String reply = client.recvStr();
       log("Received message: " + reply);
       Thread.currentThread().sleep(1000);
}

设备

设备设置 id 就像客户端一样用于唯一路由。
设备将device.send("DEVICEREADY") 发送到服务器以指示在线可用性。
设备执行recvStr() 3 次以从服务器读取完整的信封。

String deviceId = "device1"
Socket device = context.socket(ZMQ.REQ);
device.setIdentity(deviceId.getBytes());

device.connect( "tcp://localhost:5560");
device.send( "DEVICEREADY");

while (!Thread.currentThread().isInterrupted()) {
   String clientAddress = device.recvStr();             
   String empty = device.recvStr();
   String clientRequest = device.recvStr();

   //create envelope to send reply to same client who made request          
   device.sendMore(clientAddress);
   device.sendMore("");
   device.send( "stauts on " + deviceId + " is ok");
}

服务器(路由器/路由器)

使用 ROUTER 套接字的自定义代理;客户端连接到前端 ROUTER 套接字,而设备连接到后端路由器。服务器在两个套接字上轮询消息。

Context context = ZMQ.context(1);       
Socket frontend = context.socket(ZMQ.ROUTER);
Socket backend = context.socket(ZMQ.ROUTER);
frontend.bind( "tcp://localhost:5550");
backend.bind(  "tcp://localhost:5560");         

Poller poller = new Poller(2);  
poller.register(frontend, Poller.POLLIN);
poller.register(backend, Poller.POLLIN);

while (!Thread.currentThread().isInterrupted()) {
   poller.poll();

   //frontend poller
   if (poller.pollin(0)) {
      String clientId = frontend.recvStr();
      String empty = frontend.recvStr(); //empty frame
      String deviceId = frontend.recvStr();

      //if client is requesting to talk to nonexistent deviceId,
      //return message "no device", otherwise, create envelope and send
      //request on backend router to device.
      if( deviceMap.get( deviceId) == null ){
         frontend.sendMore(clientId);
         frontend.sendMore("");
         frontend.send("no deviceId: " + deviceId);
      } else {
        //request envelope addressed to specific device
        backend.sendMore(deviceId);
        backend.sendMore("");
        backend.sendMore(clientId);
        backend.sendMore("");
        backend.send("hello from " + clientId);
       }
   }

    //backend poller  
    if(poller.pollin(1)){
       String deviceId = backend.recvStr();
       String empty = backend.recvStr();
       String clientId = backend.recvStr();

       //device signaling it's ready
       //store deviceId in map, don't send a response
       if( clientId.equals("DEVICEREADY"))
           deviceMap.put(deviceId, deviceId);

       else {
           //the device is sending a response to a client
           //create envelope addressed to client, send on frontend socket
           empty = backend.recvStr();
           String reply = backend.recvStr();
           frontend.sendMore(clientId);
           frontend.sendMore("");
           frontend.send(reply);
        }
    }
}

【讨论】:

  • 很想看看!我精通Java。在我坚持糟糕的设计之前,我会尽可能多地了解这一点......
猜你喜欢
  • 1970-01-01
  • 2016-06-20
  • 2017-05-06
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-05-07
  • 1970-01-01
相关资源
最近更新 更多