【问题标题】:Independently consuming websocket messages (Python)独立消费 websocket 消息(Python)
【发布时间】:2020-04-06 18:34:23
【问题描述】:

有没有办法使用两个单独的async for 循环从 websocket 连接中消费两次消息?

运行下面的代码会给出 RuntimeError: cannot call recv 而另一个协程已经在等待下一条消息

import websockets
import asyncio

async def foo(ws):
    async for msg in ws:
        print(f"foo: {msg}")


async def bar(ws):
    async for msg in ws:
        print(f"bar: {msg}")


async def main():
    async with websockets.connect("wss://echo.websocket.org") as ws:
        asyncio.create_task(foo(ws))
        asyncio.create_task(bar(ws))
        await ws.send("Hello")


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

【问题讨论】:

  • 您想阅读同一条消息两次,还是一次阅读两条消息
  • 阅读同一条消息两次。

标签: python python-3.x websocket python-asyncio


【解决方案1】:

您可以创建一个广播函数,该函数读取一次ws,并将每条消息传输到多个生成器。例如(未经测试):

def broadcast(stream, num):
    # iterate over the stream just once, but put each message into
    # multiple queues
    queues = []
    async def consume():
        async for msg in stream:
            for queue in queues:
                await queue.put(msg)
        for queue in queues:
            await queue.put(None)
    asyncio.create_task(consume())

    # create the queues and return the generators that transmit
    # their contents
    async def transmit(queue):
        while True:
            msg = await queue.get()
            if msg is None:
                break
            yield msg

    iters = []
    for _ in range(num):
        queue = asyncio.Queue()
        iters.append(transmit(queue))
        queues.append(queue)

    return iters

有了这个,你main() 可能看起来像这样:

async def main():
    async with websockets.connect("wss://echo.websocket.org") as ws:
        foo_stream, bar_stream = broadcast(ws, 2)
        asyncio.create_task(foo(foo_stream))
        asyncio.create_task(bar(bar_stream))
        await ws.send("Hello")

【讨论】:

  • 谢谢,这太完美了!
  • 不过只有一个问题。如果在foo 中跳出for 循环,消息是否仍会广播到foo_stream?如果我创建大量流并且经常中断它们,这会阻塞内存吗?
  • @Jack 很好发现。这种设计期望所有返回的生成器都被耗尽,否则将无限地在队列中累积消息。 (我预计网络会成为瓶颈。)您可以将asyncio.Queue() 替换为asyncio.Queue(1),这将在您暂停排放其中一些发电机时提供背压,但代价是暂停其他发电机。如果您希望能够永久脱离生成器,则需要添加更多通信代码。这是一个很好的练习,并且(如果你遇到困难)可能是单独问题的材料。
  • 感谢您的回复。如果问得不算多,您能否就我必须编写的额外“通信代码”指出正确的方向?是否涉及 with 语句?
  • @Jack with 可以用来使它更方便,但重要的是实现功能。您需要能够从队列列表中删除队列。例如。您可以与transmit 一起创建一个新函数,并将其附加为iters 中每个生成器的您选择的属性(例如cancel)。这将允许调用者在中断async for msg in stream 后调用类似stream.cancel() 的东西。 (是的,您可以通过提供上下文管理器并强制使用 with 来实现自动化,但您需要先具备相应的功能。)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2015-08-25
  • 2021-07-11
  • 2019-08-27
  • 1970-01-01
  • 2017-09-08
  • 2015-11-25
  • 2021-12-02
相关资源
最近更新 更多