【问题标题】:Send data via websocket from synchronous iterator in Starlette从 Starlette 中的同步迭代器通过 websocket 发送数据
【发布时间】:2020-01-10 03:35:55
【问题描述】:

我有一个同步迭代器,它来自第三方包。迭代器查询外部服务并产生一些数据。如果没有数据,则迭代器等待它。我将来自 Starlette 的 WebSocketEndpoint 子类化,以通过 websocket 从迭代器发送新数据。不幸的是,我似乎不明白某些内容,并且我的代码无法按预期工作。这是一个稍微简化的代码:

import time

from starlette.endpoints import WebSocketEndpoint
from starlette.websockets import WebSocket


class Iterator:
    """This is a third-party object, not asynchronous at all."""

    def __init__(self):
        self._stop = False

    def __iter__(self):
        self.i = 0
        return self

    def __next__(self):
        if self._stop:
            raise StopIteration

        time.sleep(5)
        self.i += 1
        print(self.i)
        return self.i

    def cancel(self):
        self._stop = True


class MyWebSocket(WebSocketEndpoint):
    def __init__(self, scope, receive, send) -> None:
        super().__init__(scope, receive, send)

        self.iterator = Iterator()

    async def on_connect(self, websocket: WebSocket) -> None:
        await super().on_connect(websocket)

        for message in self.iterator:
            await websocket.send_json({"message": message})

    async def on_disconnect(self, websocket: WebSocket, close_code: int) -> None:
        await super().on_disconnect(websocket, close_code)

        self.iterator.cancel()

第一个问题 - 代码不通过 websocket 发送任何数据。 print 语句表明,迭代器产生数据,但实际上没有发送任何数据。如果我将return 放在websocket.send_json() 之后,它将正确发送来自迭代器的第一个结果,但循环将在之后完成。为什么?

另一个问题是迭代器完全阻塞了应用程序的执行。我理解它为什么会发生,但由于它是一个 Web 服务,并且迭代器旨在工作,直到客户端与 Websocket 断开连接,它很容易阻塞我的整个应用程序。如果我有 10 个工作人员,那么 10 个 websocket 客户端将阻止应用程序,并且在其中一个 websocket 断开连接之前将无法执行任何操作。我该如何解决?

【问题讨论】:

    标签: python websocket python-asyncio starlette


    【解决方案1】:

    这是一个第三方对象,根本不是异步的。

    这就是问题所在 - asyncio 是单线程的,因此您的迭代器必须完全不阻塞(例如在迭代内置集合时),或者您必须使用 async iteratorasync for 循环这将在等待下一个项目时暂停执行。

    在处理第三方阻塞函数时,您可以使用run_in_executor 将其合并到异步代码中,这会将函数提交到线程池并暂停当前协程直到函数完成。您不能将迭代器直接传递给run_in_executor,但您可以创建一个包装器,它采用同步迭代器并运行__next__run_in_executor 的每个单独调用,提供异步迭代器的接口。例如:

    async def wrap_iter(iterable):
        loop = asyncio.get_event_loop()
        it = iter(iterable)
    
        DONE = object()
        def get_next_item():
            # Get the next item synchronously.  We cannot call next(it)
            # directly because StopIteration cannot be transferred
            # across an "await".  So we detect StopIteration and
            # convert it to a sentinel object.
            try:
                return next(it)
            except StopIteration:
                return DONE
    
        while True:
            # Submit execution of next(it) to another thread and resume
            # when it's done.  await will suspend the coroutine and
            # allow other tasks to execute while waiting.
            next_item = await loop.run_in_executor(None, get_next_item)
            if next_item is DONE:
                break
            yield next_item
    

    现在您可以将for message in self.iterator 替换为async for message in wrap_iter(self.iterator),一切正常。

    【讨论】:

    • 谢谢,我错过了async for 的存在。
    猜你喜欢
    • 2012-03-06
    • 2014-02-04
    • 1970-01-01
    • 1970-01-01
    • 2017-01-09
    • 2015-12-10
    • 2021-06-12
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多