【问题标题】:Why does pyzmq subscriber behave differently with asyncio?为什么 pyzmq 订阅者与 asyncio 的行为不同?
【发布时间】:2020-10-05 00:10:45
【问题描述】:

我有一个 XPUB/XSUB 设备和多个模拟发布者在一个进程中运行。在一个单独的过程中,我想连接一个订阅者并将收到的消息打印到终端。下面我将展示一个简单函数的两个变体来做到这一点。我将这些函数包装为命令行实用程序。

我的问题是 asyncio 变体从不接收消息

另一方面,非异步变体工作得很好。我已经测试了 ipc 和 tcp 传输的所有情况。发布过程在我的测试中永远不会改变,除非我重新启动它以更改传输。消息是短字符串,大约每秒发布一次,因此我们不考虑性能问题。

订阅者程序无限期地位于msg = await sock.receive_multipart() 行。在 XPUB/XSUB 设备中,我有显示 sock.setsockopt(zmq.SUBSCRIBE, channel.encode()) 消息转发的工具,与非异步变体连接时相同。

asyncio 变体(不工作,如上所述)

def subs(url, channel):
    import asyncio

    import zmq
    import zmq.asyncio

    ctx = zmq.asyncio.Context.instance()
    sock = ctx.socket(zmq.SUB)
    sock.connect(url)
    sock.setsockopt(zmq.SUBSCRIBE, channel.encode())

    async def task():
        while True:
            msg = await sock.recv_multipart()
            print(' | '.join(m.decode() for m in msg))

    try:
        asyncio.run(task())
    finally:
        sock.setsockopt(zmq.LINGER, 0)
        sock.close()

常规阻塞变体(工作正常)

def subs(url, channel):
    import zmq

    ctx = zmq.Context.instance()
    sock = ctx.socket(zmq.SUB)
    sock.connect(url)
    sock.setsockopt(zmq.SUBSCRIBE, channel.encode())

    def task():
        while True:
            msg = sock.recv_multipart()
            print(' | '.join(m.decode() for m in msg))

    try:
        task()
    finally:
        sock.setsockopt(zmq.LINGER, 0)
        sock.close()

对于这个特定的工具,不需要使用 asyncio。但是,我在代码的其他地方也遇到了这个问题,异步接收从未收到。所以我希望通过在这个简单的案例中清理它,我会理解一般出了什么问题。

我的版本是

import zmq
zmq.zmq_version()  # '4.3.2'
zmq.__version__  # '19.0.2'

我使用的是 MacOS 10.13.6。

我完全没有想法。互联网,请帮忙!

【问题讨论】:

  • 既然你没有想法,试着捕捉异常,看看它是否能提供更多信息。也许确认常规 zmq 和 asyncio zmq 组合是否应该工作,或者整个事情是否需要异步。
  • 谢谢苛刻。为清楚起见,我在发布之前退出了所有日志记录调用。我很确定客户端代码在异步接收上永远等待。我提到的发布者进程是使用异步 zmq 套接字编写的,因此唯一的工作情况是异步和非异步。我不明白这应该有什么关系。当然,如果发布过程是相同的,那么连接客户端使用的是什么并不重要?

标签: python python-asyncio zeromq publish-subscribe pyzmq


【解决方案1】:

一个有效的异步变体是

def subs(url, channel):
    import asyncio

    import zmq
    import zmq.asyncio

    ctx = zmq.asyncio.Context.instance()

    async def task():
        sock = ctx.socket(zmq.SUB)
        sock.connect(url)
        sock.setsockopt(zmq.SUBSCRIBE, channel.encode())

        try:
            while True:
                msg = await sock.recv_multipart()
                print(' | '.join(m.decode() for m in msg))
        finally:
            sock.setsockopt(zmq.LINGER, 0)
            sock.close()

    asyncio.run(task())

我的结论是,在使用 asyncio zmq 时,必须使用在等待套接字的事件循环上运行的调用来创建套接字。尽管原始形式没有对事件循环做任何花哨的事情,但似乎套接字具有与asyncio.run 使用的不同的事件循环。我不知道为什么,我没有打开 pyzmq 的问题,因为他们的文档显示了这个答案中的用法,没有评论。

编辑回应评论:

asyncio.run 总是创建一个新的事件循环,因此可能为在传递给asyncio.run 的协程之外实例化的套接字创建的循环(如原始问题中的 asyncio 变体)显然不同。

【讨论】:

  • ...套接字的事件循环与 asyncio.run 使用的不同 - 我不知道为什么 - zmq 可能在内部使用 asyncio.get_event_loop() 来找出调用时的“当前事件循环”,并继续向该循环提交感兴趣的事件。由于asyncio.run() 创建了一个新的事件循环,因此可以保证该循环与之前获得的循环不同。最好组织代码,以便所有与 asyncio 相关的(导入除外)都从 asyncio.run() 调用的函数(或该函数调用的函数等,而不是之前的函数)运行。
  • 谢谢@user4815162342!我应该注意到 asyncio.run always 创建了一个新的事件循环。这一切都清楚了!
猜你喜欢
  • 2021-08-16
  • 1970-01-01
  • 2010-12-04
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-11-18
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多