【问题标题】:How to clean up after asyncio.wait_for's timeout?asyncio.wait_for 超时后如何清理?
【发布时间】:2019-06-15 15:08:17
【问题描述】:

我的目标是练习使用 asyncio 库。我已经阅读了一些入门教程,现在我想自己编写一些代码。

我想开始两个简单的任务,它们基本上增加了存储在外部类中的公共值。第一个是自动的 - 5 秒后增加一。第二个任务是用户相关的:如果你在这 5 秒内输入了一些值,它也应该被添加。

问题是,当我不输入任何值时,我的循环不会关闭 - 程序仍然处于活动状态并永远运行,直到我强制停止它 - 然后我收到以下错误:

2.py
[Auto_increment: ] This task will increment value after 5 seconds
[Manual increment: ] Waiting 5s for inc value:
Timeout
Loop finished. Value is 1
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/usr/lib/python3.7/concurrent/futures/thread.py", line 40, in _python_exit
    t.join()
  File "/usr/lib/python3.7/threading.py", line 1032, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.7/threading.py", line 1048, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt

Process finished with exit code 0

基本上在“循环完成”之后程序结束,但是当控制台输入没有输入值时,程序就挂起。当我输入任何 v

2.py
[Auto_increment: ] This task will increment value after 5 seconds
[Manual increment: ] Waiting 5s for inc value:
5
Loop finished. Value is 6

Process finished with exit code 0

看起来当 TimeoutError 发生时,在 asyncio.wait_for 之后有一些东西没有清理。你能帮我看看,有什么问题吗?这是我的代码:

import asyncio
import sys


class ValContainer:
    _val = 0

    @staticmethod
    def inc_val(how_many=1):
        ValContainer._val += how_many

    @staticmethod
    def get_val() -> int:
        return ValContainer._val


async def auto_increment():
    print(f'[Auto_increment: ] This task will increment value after 5 seconds')
    await asyncio.sleep(5)
    ValContainer.inc_val()
    return True


async def manual_increment(loop):
    print(f'[Manual increment: ] Waiting 5s for inc value:')
    try:
        future = loop.run_in_executor(None, sys.stdin.readline)
        line = await asyncio.wait_for(future, 5, loop=loop)
        if line:
            try:
                how_many = int(line)
                ValContainer.inc_val(how_many)
            except ValueError:
                print('That\'s not a number!')

    except asyncio.TimeoutError:
        print('Timeout')
    finally:
        return True

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    task_auto = loop.create_task(auto_increment())
    task_man = loop.create_task(manual_increment(loop))
    loop.run_until_complete(task_auto)
    loop.run_until_complete(task_man)
    print(f'Loop finished. Value is {ValContainer.get_val()}')
    loop.close()

【问题讨论】:

  • 不确定为什么在增加类值时使用静态方法。如果您希望该类成为命名空间的全局类,那么至少让您自己更轻松,只需使用@classmethod
  • @RomanPerekhrest:这取决于操作系统。你的进程是否真的退出了,还是你必须在它退出之前按回车键?
  • @MartijnPieters,这就是问题所在 - “在 asyncio.wait_for 之后有些东西没有清理”
  • @Asmox,您需要退出整个程序还是确保事件循环已关闭?
  • @RomanPerekhrest:事件循环已关闭,他们在问为什么他们的进程没有退出。如果循环没有关闭,您将没有 Loop finished. Value is 1 输出。

标签: python python-3.x python-asyncio


【解决方案1】:

您已经在线程池执行程序中启动了一个单独的线程,以及那些can't actually be cancelledasyncio 'delegate' 任务被取消,但sys.stdin.readline 调用将无限期地坐在那里。你可以按回车键结束它,因为这会给你一个完整的sys.stdin 行。

你必须在这里使用one of the work-arounds to cancel the read;请注意,您不能告诉 ThreadPoolExecutor 使用守护线程。

如果在异步上下文中等待用户输入作为单独的任务,创建自己的线程可能比让THreadPoolExecutor 为您管理线程更容易,因此您可以设置daemon=True在该线程上,并让进程在退出时终止该线程。

【讨论】:

  • 可以考虑另一种解决方法来取消整个“run_in_executor using ThreadPoolExecutor”。 (gist.github.com/yeraydiazdiaz/b8c059c6dcfaf3255c65806de39175a7) 不仅仅是sys.stdin
  • @RomanPerekhrest:不,这也行不通,正如要点中所述:shutdown 旨在阻止新工作进入,但不是为了阻止现有工作 .该要点使用sleep()s 调用,最终自行退出。
  • @RomanPerekhrest:你得到的只是线程上没有 join() 调用。该进程仍将等待线程,直到您发出中断。
  • 我觉得我不喜欢 run_in_executor(<None, THreadPoolExecutor> 如此难以管理并导致这样的变通办法的事实。我会选择一个可以识别/杀死的单独线程/进程,
  • @RomanPerekhrest 这不限于ThreadPoolExecutor,这是线程的问题,句号。单独的进程也有限制,例如,您需要担心分离流。
猜你喜欢
  • 1970-01-01
  • 2023-01-13
  • 1970-01-01
  • 2016-08-01
  • 1970-01-01
  • 1970-01-01
  • 2011-06-29
  • 1970-01-01
  • 2021-02-24
相关资源
最近更新 更多