【问题标题】:Waiting on condition variable with timeout: lock not reacquired in time等待条件变量超时:未及时重新获取锁
【发布时间】:2019-12-13 01:16:45
【问题描述】:

我有一个名为condasyncio.Condition。我想等它,但只能等那么久才放弃。由于asyncio.Condition.wait 不会超时,因此无法直接执行此操作。 The docs 声明 asyncio.wait_for 应该用于包装并提供超时:

asyncio.wait_for() 函数可用于在超时后取消任务。

因此,我们得出以下解决方案:

async def coro():
    print("Taking lock...")
    async with cond:
        print("Lock acquired.")
        print("Waiting!")
        await asyncio.wait_for(cond.wait(), timeout=999)
        print("Was notified!")
    print("Lock released.")

现在假设coro 本身在运行五秒后被取消。这会在wait_for 中抛出CancelledError,这会在重新引发错误之前取消cond.wait。然后错误传播到coro,由于async with 块,隐式尝试释放cond 中的锁。但是,当前没有持有锁; cond.wait 已被取消,但没有机会处理该取消并重新获取锁。因此,我们得到一个丑陋的异常,如下所示:

Taking lock...
Lock acquired.
Waiting!
ERROR:asyncio:Task exception was never retrieved
future: <Task finished coro=<coro() done, defined at [REDACTED]> exception=RuntimeError('Lock is not acquired.',)>
Traceback (most recent call last):
  [REDACTED], in coro
    await asyncio.wait_for(cond.wait(), timeout=999)
  [REDACTED], in wait_for
    yield from waiter
concurrent.futures._base.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  [REDACTED], in coro
    print("Was notified!")
  [REDACTED], in coro
    res = func(*args, **kw)
  [REDACTED], in __aexit__
    self.release()
  [REDACTED], in release
    raise RuntimeError('Lock is not acquired.')
RuntimeError: Lock is not acquired.

换句话说,在处理CancelledError 时,coro 在尝试释放未持有的锁时引发了RuntimeError。堆栈跟踪显示print("Was notified!") 行的原因是因为这是有问题的async with 块的最后一行。


这感觉不是我能解决的问题;我开始怀疑这是图书馆本身的一个错误。但是,我想不出任何方法来避免这个问题或创建一个解决方法,所以任何想法都会受到赞赏。

在写这个问题并进一步调查时,我在 Python 错误跟踪器上偶然发现了类似的问题,最后检查了 asyncio 源代码,并确定这实际上是 asyncio 本身的错误。

我已将它提交给问题跟踪器here,供有同样问题的人使用,并用我创建的解决方法回答了我自己的问题。


编辑:应 ParkerD 的要求,这里是产生上述问题的完整可运行示例:

编辑 2: 更新示例以使用 Python 3.7+ 中的新 asyncio.runasyncio.create_task 功能

import asyncio

async def coro():
    cond = asyncio.Condition()
    print("Taking lock...")
    async with cond:
        print("Lock acquired.")
        print("Waiting!")
        await asyncio.wait_for(cond.wait(), timeout=999)
        print("Was notified!")
    print("Lock released.")

async def cancel_after_5(c):
    task = asyncio.create_task(c)
    await asyncio.sleep(5)
    task.cancel()
    await asyncio.wait([task])

asyncio.run(cancel_after_5(coro()))

【问题讨论】:

  • 你能贴出调用coro的代码吗?当我运行并取消它时,一切似乎都正常
  • @ParkerD 我添加了一个可运行的示例。
  • 所以,如果你使用asyncio.run(cancel_after_5(coro())) 而不是asyncio.get_event_loop().run_until_complete(cancel_after_5(coro())) 对我来说效果很好,这很有趣。
  • @ParkerD 刚刚检查过了。看起来问题是在事件循环之外分配cond = asyncio.Condition()。如果你使用asyncio.run,它会创建自己的事件循环,这不是cond 看到的那个,所以你会得到一个got Future attached to a different loop 异常而不是预期的CancelledErrorRuntimeError: Lock is not acquired。出于某种原因,除非您明确捕获并打印它,否则这是无声的。我将编辑帖子以使用asyncio.run 给出一个可运行的示例。

标签: python python-asyncio


【解决方案1】:

如问题末尾所述,我已确定该问题实际上是库中的错误。我将重申该错误的问题跟踪器是here,并介绍我的解决方法。

以下函数基于wait_for 本身(来源here),是专门用于等待条件的版本,并附加保证取消它是安全的。

调用wait_on_condition_with_timeout(cond, timeout) 大致相当于asyncio.wait_for(cond.wait(), timeout)

async def wait_on_condition_with_timeout(condition: asyncio.Condition, timeout: float) -> bool:
    loop = asyncio.get_event_loop()

    # Create a future that will be triggered by either completion or timeout.
    waiter = loop.create_future()

    # Callback to trigger the future. The varargs are there to consume and void any arguments passed.
    # This allows the same callback to be used in loop.call_later and wait_task.add_done_callback,
    # which automatically passes the finished future in.
    def release_waiter(*_):
        if not waiter.done():
            waiter.set_result(None)

    # Set up the timeout
    timeout_handle = loop.call_later(timeout, release_waiter)

    # Launch the wait task
    wait_task = loop.create_task(condition.wait())
    wait_task.add_done_callback(release_waiter)

    try:
        await waiter  # Returns on wait complete or timeout
        if wait_task.done():
            return True
        else:
            raise asyncio.TimeoutError()

    except (asyncio.TimeoutError, asyncio.CancelledError):
        # If timeout or cancellation occur, clean up, cancel the wait, let it handle the cancellation,
        # then re-raise.
        wait_task.remove_done_callback(release_waiter)
        wait_task.cancel()
        await asyncio.wait([wait_task])
        raise

    finally:
        timeout_handle.cancel()

关键部分是,如果发生超时或取消,该方法会等待条件重新获得锁,然后再重新引发异常:

except (asyncio.TimeoutError, asyncio.CancelledError):
        # If timeout or cancellation occur, clean up, cancel the wait, let it handle the cancellation,
        # then re-raise.
        wait_task.remove_done_callback(release_waiter)
        wait_task.cancel()
        await asyncio.wait([wait_task])  # This line is missing from the real wait_for
        raise

我已经在 Python 3.6.9 上对此进行了测试,它运行良好。 3.7 和 3.8 中也存在相同的错误,所以我想它对这些版本也很有用。如果您想知道何时修复该错误,请查看上面的问题跟踪器。如果您想要Conditions 以外的其他版本,更改参数和create_task 行应该很简单。

【讨论】:

    猜你喜欢
    • 2019-03-11
    • 1970-01-01
    • 2011-01-24
    • 1970-01-01
    • 2020-06-30
    • 2016-12-16
    • 2010-12-29
    • 2011-01-07
    • 1970-01-01
    相关资源
    最近更新 更多