【问题标题】:ZMQ devices, ioloop and multiprocessingZMQ 设备、ioloop 和多处理
【发布时间】:2015-07-16 02:17:00
【问题描述】:

我正在尝试应用 PUSH/PULL 模式,如下图所示:

                            | PULL  ---> Send via HTTP
                            | PULL  ---> Send via HTTP
---- PUSH ----- DEVICE ---- | PULL  ---> Send via HTTP
                            | PULL  ---> Send via HTTP
                            | PULL  ---> Send via HTTP

PUSH 套接字连接到 ZeroMQ 设备并发出消息,然后将这些消息传播到所有连接的 PULL 套接字。我想要实现的是一种对管道中多个节点的并行处理。 当PULL 套接字完成处理后,它应该通过HTTP 将消息转发到远程端点。

代码如下:

from multiprocessing import Process
import random
import time
import zmq
from zmq.devices import ProcessDevice

from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream

ioloop.install()


bind_in_port = 5559
bind_out_port = 5560

dev = ProcessDevice(zmq.STREAMER, zmq.PULL, zmq.PUSH)
dev.bind_in("tcp://127.0.0.1:%d" % bind_in_port)
dev.bind_out("tcp://127.0.0.1:%d" % bind_out_port)
dev.setsockopt_in(zmq.IDENTITY, b'PULL')
dev.setsockopt_out(zmq.IDENTITY, b'PUSH')
dev.start()
time.sleep(2)


def push():
    context = zmq.Context()
    socket = context.socket(zmq.PUSH)
    socket.connect("tcp://127.0.0.1:%s" % bind_in_port)
    server_id = random.randrange(1,10005)
    for i in range(5):
        print("Message %d sent" % i)
        socket.send_string("Push from %s" % server_id)


def pull():
    context = zmq.Context()
    socket = context.socket(zmq.PULL)
    socket.connect("tcp://127.0.0.1:%s" % bind_out_port)
    loop = ioloop.IOLoop.instance()

    pull_stream = ZMQStream(socket, loop)

    def on_recv(message):
        print(message)
    pull_stream.on_recv(on_recv)

    loop.start()

Process(target=push).start()

time.sleep(2)

for i in range(2):
    Process(target=pull).start()

虽然消息已正确发送到 ZeroMQ 设备,但我看不到收到任何消息 - 从未调用过 on_recv 回调。

感谢任何帮助。

谢谢

【问题讨论】:

    标签: python tornado zeromq python-multiprocessing


    【解决方案1】:

    设备初始化中缺少上述代码以提供完整答案。 什么是 dev 和 *port ? 一件事可能是在 .connect 之后添加 sleep(1) 以使端口稳定 注意:推/拉时无需设置身份

    【讨论】:

    • 谢谢丹妮。添加睡眠呼叫解决了这个问题。
    • 另一种(更好的?)解决方案是实现握手。例如,发送 \x00 直到收到,然后另一个发送以预先确定的答案(如 \x01)进行响应。那么您可以放心地假设它已连接。保持它去捕捉断开连接。但这超出了原始问题的范围:)
    猜你喜欢
    • 1970-01-01
    • 2012-11-20
    • 2019-09-06
    • 2018-06-21
    • 2017-07-02
    • 1970-01-01
    • 1970-01-01
    • 2023-02-04
    • 1970-01-01
    相关资源
    最近更新 更多