【问题标题】:Running coroutines and threads in asyncio event loop: Errors while exiting在异步事件循环中运行协程和线程:退出时出错
【发布时间】:2018-07-20 04:42:34
【问题描述】:

我设计了一段 python 代码,它本质上是在更大的方案中作为微服务工作的。

我在循环中安排了两个任务,我设置了两个任务在执行程序中运行。

奇怪的是代码运行良好。做我期望的一切。但是当我用 KeyboardInterrupt (Ctrl+C) 结束它时,我会看到错误和异常。这让我觉得我肯定在这里滥用了 asyncio 模式。我将尝试提供代码的简要概述,而不会立即进入冗长的细节:

class Prototype:
    def _redis_subscriber(self):
        self._p = self._redis_itx.pubsub(ignore_subscribe_messages=True)
        self._p.subscribe("channel1")
        while True:
            pubbed_msg = self._p.get_message()
            if pubbed_msg is not None:
                #process process process 
            time.sleep(0.01)

    def _generic_worker_on_internal_q(self):
        while True:
            item = self.q.get() #blocking call
            #process item 

    async def task1(self):
        #network I/O bound code 

    async def task2(self):
        #network I/O bound code. also fills with self.q.put() 

    def run(self):
        asyncio.ensure_future(self.task1(), loop=self._event_loop)
        asyncio.ensure_future(self.task2(), loop=self._event_loop)
        asyncio.ensure_future(self._event_loop.run_in_executor(None, self._redis_subscriber))
        asyncio.ensure_future(self._event_loop.run_in_executor(None, self._generic_worker_on_internal_q))
        self._event_loop.run_forever()

if __name__ == '__main__':
    p = Prototype()
    p.run()

另外,我尝试在 Protoype.run() 方法中尝试另一种方法:

def __init__(self):
    self._tasks = []

def run(self):
    self._tasks.append(asyncio.ensure_future(self._task1()))
    self._tasks.append(asyncio.ensure_future(self._task2()))

 self._tasks.append(asyncio.ensure_future(self._event_loop.run_in_executor(None, self._redis_subscriber)))

  self._tasks.append(asyncio.ensure_future(self._event_loop.run_in_executor(None, self._generic_worker_on_internal_q)))
    self._event_loop.run_until_complete(self._tasks)

无论如何,当我尝试使用 Ctrl+C 结束正在运行的脚本时,它不会在第一次尝试时退出。我必须按两次。这就是出现的情况:

KeyboardInterrupt
^CError in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/concurrent/futures/thread.py", line 40, in _python_exit
    t.join()
  File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 1056, in join
    self._wait_for_tstate_lock()
  File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt
Exception ignored in: <bound method BaseEventLoop.call_exception_handler of <_UnixSelectorEventLoop running=False closed=False debug=False>>
Traceback (most recent call last):
  File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 1296, in call_exception_handler
  File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/logging/__init__.py", line 1335, in error
  File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/logging/__init__.py", line 1442, in _log
  File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/logging/__init__.py", line 1452, in handle
  File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/logging/__init__.py", line 1514, in callHandlers
  File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/logging/__init__.py", line 863, in handle
  File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/logging/__init__.py", line 1069, in emit
  File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/logging/__init__.py", line 1059, in _open
NameError: name 'open' is not defined

我哪里错了?

【问题讨论】:

  • 也许可以在启用调试的情况下尝试相同的操作。 docs.python.org/3/library/asyncio-dev.html ?
  • 您的一个run_in_executor 线程未在中断时退出; atexit 处理程序将None 发送到工作队列,然后加入它们。 None 通知这些线程退出,但正在运行的函数永远不会退出,因此工作线程永远不会退出。

标签: python multithreading asynchronous redis event-loop


【解决方案1】:

您在执行程序中安排了两个无限任务。这些任务正在阻止退出。

默认任务执行器将这些任务放入由队列管理的线程中,当退出时,会通知队列停止执行任务。但是,如果您的任务永远不会返回,队列管理器将永远无法检查此状态。

您可以通过不运行无限循环来避免这种状态。相反,每次到达终点时重新安排你的任务,并且不要阻止接收消息:

def _redis_subscriber(self):
    self._p = self._redis_itx.pubsub(ignore_subscribe_messages=True)
    self._p.subscribe("channel1")

    def process_message():
        # non-blocking task to repeatedly run in the executor
        pubbed_msg = self._p.get_message(False)
        if pubbed_msg is not None:
            # process process process 
        time.sleep(0.01)
        # reschedule function for next message
        asyncio.ensure_future(self._event_loop.run_in_executor(None, process_message))

    # kick of first handler
    process_message()

你仍然在执行器中运行这个函数来启动它:

def run(self):
    # ...
    asyncio.ensure_future(self._event_loop.run_in_executor(None, self._redis_subscriber))

_generic_worker_on_internal_q() 执行相同操作,并确保避免对Queue.get() 使用阻塞调用,因此请使用self.q.get(False)

你甚至可以为此使用装饰器:

import asyncio
from functools import partial, wraps


def auto_reschedule(loop=None, executor=None):
    """Repeatedly re-schedule function in the given executor"""
    def decorator(f):
        @wraps(f)
        def wrapper(*args, **kwargs):
            result = f(*args, **kwargs)
            callable = wrapper
            if args or kwargs:
                callable = partial(callable, *args, **kwargs)
            current_loop = loop
            if current_loop is None:
                current_loop = asyncio.get_event_loop()
            current_loop.run_in_executor(executor, callable)
            return result
        return wrapper
    return decorator

并在您的内部函数上使用此装饰器,您可以在其中访问引用循环的实例属性:

def _redis_subscriber(self):
    self._p = self._redis_itx.pubsub(ignore_subscribe_messages=True)
    self._p.subscribe("channel1")

    @auto_reschedule(self._event_loop)
    def process_message():
        # non-blocking task to repeatedly run in the executor
        pubbed_msg = self._p.get_message(False)
        if pubbed_msg is not None:
            # process process process 
        time.sleep(0.01)

    # kick of first handler
    process_message()

后者的快速演示:

import asyncio
import time
import random

# auto_reschedule imported or defined

def create_thread_task(i, loop):
    @auto_reschedule(loop)
    def thread_task():
        print(f'Task #{i} running in worker')
        time.sleep(random.uniform(1, 3))

    return thread_task


def main():
    loop = asyncio.get_event_loop()
    for i in range(5):
        asyncio.ensure_future(
            loop.run_in_executor(None, create_thread_task(i, loop)))
    loop.run_forever()


if __name__ == '__main__':
    main()

【讨论】:

  • 您发现原因是绝对正确的。尽管提出的解决方案根本没有启动两个线程工作者的执行,但我至少得到了我最初问题的答案。并且对我使用的不良模式有一个很好的看法。谢谢。
  • @anomit:很有趣,但原则是合理的。请参阅gist.github.com/mjpieters/fa6277893f500df71f1381f7f61e3ab3 了解嵌套函数重新调度自身的工作演示。键盘中断完全退出脚本(在 time.sleep() 调用后任务退出最多 3 秒后)。
猜你喜欢
  • 1970-01-01
  • 2018-07-13
  • 1970-01-01
  • 2021-06-29
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多