【问题标题】:How to create ZeroMQ socket suitable both for sending and consuming?如何创建适合发送和消费的 ZeroMQ 套接字?
【发布时间】:2020-01-21 19:26:28
【问题描述】:

您能否为以下场景推荐一个 ZeroMQ 套接字架构:

1) 端口上有服务器监听

2) 有多个客户端同时连接服务器

3) 服务器接受来自客户端的所有连接并为每个客户端提供双向队列,意味着双方(客户端 N 或服务器)都可以发送或消费消息,即双方都可以是通信的发起者,另一方应该有处理消息的回调。

我们是否应该在每个接受的连接上创建额外的 ZeroMQ 套接字以从服务器推送消息?您能否建议谷歌针对此类架构使用哪种 ZeroMQ 套接字类型?

【问题讨论】:

    标签: zeromq distributed-computing messaging distributed-system


    【解决方案1】:

    Q…在每个接受的连接上创建额外的 ZeroMQ 套接字以从服务器推送消息?

    最好的基于简单组合的设计 - 可扩展性和安全性

    原生 ZeroMQ 原语(智能原语 Scalable Formal Communications Pattern Archetypes )对我们来说就像一个乐高积木 - 我们将它们进一步用于我们应用程序域中预期目标用途的消息/信号平面。

    Q您能否向谷歌提供针对此类架构的 ZeroMQ 套接字类型的建议?

    没有,因为没有针对此类建议的详细要求列表。一对PUSH/PULL-s 本身并不足够,临时执行(偶发)REQ/REP 可能有助于客户(重新)发现阶段,就像其他共存、持久或偶发原型对作曲所做的那样任何其他系统/服务平面。

    【讨论】:

      【解决方案2】:

      我遇到了基本相同的问题。对我来说,问题似乎是即使在具有 GIL 保护的 python 中,您也不能一次从两个线程操作一个套接字。这将导致解释器崩溃。因此,一个线程不能有发送和接收线程,而是一个线程,然后必须能够同时接收和发送,而 pyzmq 无法同时 poll() 一个套接字和非套接字。

      user3666197 也回复了我的问题,但这些回复都没有任何帮助。尤其是因为不能假设有 1000 多个空闲和开放端口来为所有客户端连接生成主机套接字。

      有一个相对丑陋的解决方案。这是一个缺少适当的 NOWAIT 标志等的草案。 make_process_pull_socket() 为进程内部的通信创建了一个套接字。如下:

      class TwoWay(threading.Thread):
      def __init__(self, ip_string, port, inque:queue.Queue):
          super(TwoWay, self).__init__()
          self.sock = cntxt.socket(zmq.ROUTER)
          addr = "tcp://{}:{}".format(ip_string, str(port))
          self.sock.bind(addr)
          self.pull = make_process_pull_socket(4456)
          self.push = make_process_push_socket(4456)
          self.inque = inque
      
      def run(self):
          pl = zmq.Poller()
          pl.register(self.sock,zmq.POLLIN)
          pl.register(self.pull,zmq.POLLIN)
          while True:
              try:
                  p = pl.poll(timeout=2000)
                  if not p:
                      continue
                  s,i = p[0]
                  if s == self.sock:
                      a,m = s.recv_multipart()
                      self.inque.put((a,m))
                  if s == self.pull:
                      r = s.recv()
                      if len(r) > 5:
                          self.sock.send_multipart([r[:5], r[5:]])
              except Exception as e:
                  print(e)
      

      然后您将传入消息放入作为参数提供的 inque-queue 中,并且可以使用 TwoWay.push.send(id+msg) 发送传出消息(带有标识符)

      【讨论】:

      • 让我反对你的诬告,即 ZeroMQ 不能同时轮询 zmq.Socket()-instance 和非 zmq、本机套接字。 ZeroMQ API 文档对此很清楚,当使用 zmq.RAW 套接字类型时,可以使用可用的方法从 ZeroMQ Context() 实例内部操作本机套接字,包括。 .poll() 方法。如果这确实无法在 pyzmq 中工作,那么问题在 Python 绑定的实现或用户级应用程序代码中是没有问题的,而不是在 ZeroMQ 本身中。
      • 嗯。这可能是真的,但无关紧要。有用的是轮询 queue.Queues 或线程事件的能力。这可以通过文件描述符在 C/C++ 中实现,而不是在 python 中。轮询本机套接字将允许我在这里描述的那种内部循环结构,除了通过 localhost,它比进程内部内存复制慢(我假设)。
      猜你喜欢
      • 2020-06-08
      • 1970-01-01
      • 1970-01-01
      • 2018-08-31
      • 2017-10-04
      • 1970-01-01
      • 2020-09-25
      • 1970-01-01
      • 2018-02-18
      相关资源
      最近更新 更多