【问题标题】:Pausing Python asyncio coroutines暂停 Python 异步协程
【发布时间】:2018-01-02 03:05:51
【问题描述】:

由于我的项目严重依赖于异步网络 I/O,我总是不得不预期会发生一些奇怪的网络错误:无论是我连接的服务出现 API 中断,还是我自己的服务器出现网络问题, 或者是其他东西。这样的问题出现了,并且没有真正的解决方法。因此,我最终试图找出一种方法,以在发生此类网络问题时从外部有效地“暂停”协程的执行,直到重新建立连接。我的方法是编写一个装饰器pausable,它接受一个参数pause,这是一个协程函数,它将是yielded from / awaited,如下所示:

def pausable(pause, resume_check=None, delay_start=None):
    if not asyncio.iscoroutinefunction(pause):
        raise TypeError("pause must be a coroutine function")
    if not (delay_start is None or asyncio.iscoroutinefunction(delay_start)):
        raise TypeError("delay_start must be a coroutine function")

    def wrapper(coro):
        @asyncio.coroutine
        def wrapped(*args, **kwargs):
            if delay_start is not None:
                yield from delay_start()
            for x in coro(*args, **kwargs):
                try:
                    yield from pause()
                    yield x
                # catch exceptions the regular discord.py user might not catch
                except (asyncio.CancelledError,
                        aiohttp.ClientError,
                        websockets.WebSocketProtocolError,
                        ConnectionClosed,
                        # bunch of other network errors
                        ) as ex:
                    if any((resume_check() if resume_check is not None else False and
                            isinstance(ex, asyncio.CancelledError),
                            # clean disconnect
                            isinstance(ex, ConnectionClosed) and ex.code == 1000,
                            # connection issue
                            not isinstance(ex, ConnectionClosed))):
                        yield from pause()
                        yield x
                    else:
                        raise

        return wrapped
    return wrapper

特别注意这一点:

for x in coro(*args, **kwargs):
    yield from pause()
    yield x

使用示例(readyasyncio.Event):

@pausable(ready.wait, resume_check=restarting_enabled, delay_start=ready.wait)
@asyncio.coroutine
def send_test_every_minute():
    while True:
        yield from client.send("Test")
        yield from asyncio.sleep(60)

但是,这似乎不起作用,对我来说似乎不是一个优雅的解决方案。是否有与 Python 3.5.3 及更高版本兼容的有效解决方案?兼容 Python 3.4.4 及更高版本是可取的。

附录

只是try/excepting 需要暂停的协程中引发的异常对我来说既不可行也不可行,因为它严重违反了我想要的核心代码设计原则(DRY)遵从;换句话说,在这么多协程函数中排除这么多异常会让我的代码变得混乱。

【问题讨论】:

    标签: python python-3.x asynchronous concurrency python-asyncio


    【解决方案1】:

    关于当前解决方案的几句话。

    for x in coro(*args, **kwargs):
        try:
            yield from pause()
            yield x
        except
            ...
    

    您将无法通过这种方式捕获异常:

    • 在 for 循环之外引发异常
    • 在第一次异常之后生成器已经耗尽(不可用)

    .

    @asyncio.coroutine
    def test():
        yield from asyncio.sleep(1)
        raise RuntimeError()
        yield from asyncio.sleep(1)
        print('ok')
    
    
    
    @asyncio.coroutine
    def main():
        coro = test()
        try:
            for x in coro:
                try:
                    yield x
                except Exception:
                    print('Exception is NOT here.')
        except Exception:
            print('Exception is here.')
    
            try:
                next(coro)
            except StopIteration:
                print('And after first exception generator is exhausted.')
    
    
    if __name__ ==  '__main__':
        loop = asyncio.get_event_loop()
        try:
            loop.run_until_complete(main())
        finally:
            loop.close()
    

    输出:

    Exception is here.
    And after first exception generator is exhausted.
    

    即使可以恢复,也要考虑如果协程由于异常已经做了一些清理操作会发生什么。


    鉴于以上所有情况,如果某个协程引发异常,您唯一的选择是抑制此异常(如果您愿意)并重新运行此协程。如果需要,您可以在某些事件后重新运行它。像这样的:

    def restart(ready_to_restart):
        def wrapper(func):
            @asyncio.coroutine
            def wrapped(*args, **kwargs):
                while True:
                    try:
                        return (yield from func(*args, **kwargs))
                    except (ConnectionClosed,
                            aiohttp.ClientError,
                            websockets.WebSocketProtocolError,
                            ConnectionClosed,
                            # bunch of other network errors
                            ) as ex:
                        yield from ready_to_restart.wait()
    
    
    ready_to_restart = asyncio.Event()  # set it when you sure network is fine 
                                        # and you're ready to restart
    

    更新

    但是,我如何让协程继续原处 现在被打断了?

    只是为了说清楚:

    @asyncio.coroutine
    def test():
        with aiohttp.ClientSession() as client:
            yield from client.request_1()
            # STEP 1:
            # Let's say line above raises error
    
            # STEP 2:
            # Imagine you you somehow maged to return to this place
            # after exception above to resume execution.
            # But what is state of 'client' now?
            # It's was freed by context manager when we left coroutine.
            yield from client.request_2()
    

    函数和协程也没有设计为在异常传播到外部后恢复执行。

    唯一想到的是将复杂操作拆分为可重新启动的小操作,而整个复杂操作可以存储其状态:

    @asyncio.coroutine
    def complex_operation():
        with aiohttp.ClientSession() as client:
            res = yield from step_1(client)
    
            # res/client - is a state of complex_operation.
            # It can be used by re-startable steps.
    
            res = yield from step_2(client, res)
    
    
    @restart(ready_to_restart)
    @asyncio.coroutine
    def step_1():
        # ...
    
    
    @restart(ready_to_restart)
    @asyncio.coroutine
    def step_2():
        # ...
    

    【讨论】:

    • 我真的很喜欢这种方法,感谢您的出色回答!但是,我如何让协程在它现在被中断的地方继续?我考虑过给它传递一个名为state 的字典,协程必须在其中存储它的变量并从一开始就检索它的变量。 Example implementation
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-12-27
    • 1970-01-01
    • 2021-09-09
    • 1970-01-01
    • 2018-02-10
    • 2021-08-04
    • 1970-01-01
    相关资源
    最近更新 更多