【发布时间】:2020-10-05 00:10:45
【问题描述】:
我有一个 XPUB/XSUB 设备和多个模拟发布者在一个进程中运行。在一个单独的过程中,我想连接一个订阅者并将收到的消息打印到终端。下面我将展示一个简单函数的两个变体来做到这一点。我将这些函数包装为命令行实用程序。
我的问题是 asyncio 变体从不接收消息。
另一方面,非异步变体工作得很好。我已经测试了 ipc 和 tcp 传输的所有情况。发布过程在我的测试中永远不会改变,除非我重新启动它以更改传输。消息是短字符串,大约每秒发布一次,因此我们不考虑性能问题。
订阅者程序无限期地位于msg = await sock.receive_multipart() 行。在 XPUB/XSUB 设备中,我有显示 sock.setsockopt(zmq.SUBSCRIBE, channel.encode()) 消息转发的工具,与非异步变体连接时相同。
asyncio 变体(不工作,如上所述)
def subs(url, channel):
import asyncio
import zmq
import zmq.asyncio
ctx = zmq.asyncio.Context.instance()
sock = ctx.socket(zmq.SUB)
sock.connect(url)
sock.setsockopt(zmq.SUBSCRIBE, channel.encode())
async def task():
while True:
msg = await sock.recv_multipart()
print(' | '.join(m.decode() for m in msg))
try:
asyncio.run(task())
finally:
sock.setsockopt(zmq.LINGER, 0)
sock.close()
常规阻塞变体(工作正常)
def subs(url, channel):
import zmq
ctx = zmq.Context.instance()
sock = ctx.socket(zmq.SUB)
sock.connect(url)
sock.setsockopt(zmq.SUBSCRIBE, channel.encode())
def task():
while True:
msg = sock.recv_multipart()
print(' | '.join(m.decode() for m in msg))
try:
task()
finally:
sock.setsockopt(zmq.LINGER, 0)
sock.close()
对于这个特定的工具,不需要使用 asyncio。但是,我在代码的其他地方也遇到了这个问题,异步接收从未收到。所以我希望通过在这个简单的案例中清理它,我会理解一般出了什么问题。
我的版本是
import zmq
zmq.zmq_version() # '4.3.2'
zmq.__version__ # '19.0.2'
我使用的是 MacOS 10.13.6。
我完全没有想法。互联网,请帮忙!
【问题讨论】:
-
既然你没有想法,试着捕捉异常,看看它是否能提供更多信息。也许确认常规 zmq 和 asyncio zmq 组合是否应该工作,或者整个事情是否需要异步。
-
谢谢苛刻。为清楚起见,我在发布之前退出了所有日志记录调用。我很确定客户端代码在异步接收上永远等待。我提到的发布者进程是使用异步 zmq 套接字编写的,因此唯一的工作情况是异步和非异步。我不明白这应该有什么关系。当然,如果发布过程是相同的,那么连接客户端使用的是什么并不重要?
标签: python python-asyncio zeromq publish-subscribe pyzmq