【问题标题】:Python asyncio run_coroutine_threadsafe never running coroutine?Python asyncio run_coroutine_threadsafe 从不运行协程?
【发布时间】:2018-08-09 20:37:19
【问题描述】:

我不确定我在这里做错了什么,我正在尝试创建一个包含队列并使用协程来使用该队列上的项目的类。问题是事件循环在一个单独的线程中运行(在那个线程中我使用loop.run_forever() 让它运行)。

我看到的是消费项目的协程从未被触发:

import asyncio
from threading import Thread

import functools

# so print always flushes to stdout
print = functools.partial(print, flush=True)


def start_loop(loop):
    def run_forever(loop):
        print("Setting loop to run forever")
        asyncio.set_event_loop(loop)
        loop.run_forever()
        print("Leaving run forever")

    asyncio.set_event_loop(loop)
    print("Spawaning thread")
    thread = Thread(target=run_forever, args=(loop,))
    thread.start()


class Foo:
    def __init__(self, loop):
        print("in foo init")
        self.queue = asyncio.Queue()
        asyncio.run_coroutine_threadsafe(self.consumer(self.queue), loop)

    async def consumer(self, queue):
        print("In consumer")
        while True:
            message = await queue.get()
            print(f"Got message {message}")
            if message == "END OF QUEUE":
                print(f"exiting consumer")
                break
            print(f"Processing {message}...")


def main():
    loop = asyncio.new_event_loop()
    start_loop(loop)
    f = Foo(loop)
    f.queue.put("this is a message")
    f.queue.put("END OF QUEUE")

    loop.call_soon_threadsafe(loop.stop)

    # wait for the stop to propagate and complete
    while loop.is_running():
        pass


if __name__ == "__main__":
    main()

输出:

产卵线程 设置循环永远运行 在 foo 初始化 永远离开跑步

【问题讨论】:

    标签: python python-multithreading python-asyncio


    【解决方案1】:

    这段代码有几个问题。

    首先,检查警告:

    test.py:44: RuntimeWarning: coroutine 'Queue.put' was never awaited
    f.queue.put("this is a message")
    test.py:45: RuntimeWarning: coroutine 'Queue.put' was never awaited
    f.queue.put("END OF QUEUE")
    

    这意味着queue.put 是一个协程,所以它必须使用run_coroutine_threadsafe 运行:

    asyncio.run_coroutine_threadsafe(f.queue.put("this is a message"), loop)
    asyncio.run_coroutine_threadsafe(f.queue.put("END OF QUEUE"), loop)
    

    您也可以使用queue.put_nowait,这是一种同步方法。但是,asyncio 对象通常不是线程安全的,因此每次同步调用都必须经过call_soon_threadsafe

    loop.call_soon_threadsafe(f.queue.put_nowait, "this is a message")
    loop.call_soon_threadsafe(f.queue.put_nowait, "END OF QUEUE")
    

    另一个问题是循环在消费者任务开始处理项目之前停止。您可以在Foo 类中添加一个join 方法以等待消费者完成:

    class Foo:
        def __init__(self, loop):
            [...]
            self.future = asyncio.run_coroutine_threadsafe(self.consumer(self.queue), loop)
    
        def join(self):
            self.future.result()
    

    然后确保在停止循环之前调用此方法:

    f.join()
    loop.call_soon_threadsafe(loop.stop)
    

    这应该足以让程序按您的预期运行。但是,这段代码在几个方面仍然存在问题。

    首先,不应在主线程和额外线程中都设置循环。 Asyncio 循环并不意味着在线程之间共享,因此您需要确保所有与 asyncio 相关的事情都发生在专用线程中。

    由于Foo 负责这两个线程之间的通信,因此您必须格外小心以确保每一行代码都在正确的线程中运行。例如,asyncio.Queue 的实例化必须发生在 asyncio 线程中。

    请参阅 this gist 以获取程序的更正版本。


    另外,我想指出,这不是 asyncio 的典型用例。您通常希望在主线程中运行 asyncio 循环,特别是如果您需要subprocess support

    asyncio 支持从不同线程运行子进程,但有限制:

    • 事件循环必须在主线程中运行
    • 在从其他线程执行子进程之前,必须在主线程中实例化子观察程序。在主线程中调用get_child_watcher()函数来实例化child watcher。

    我建议以另一种方式设计您的应用程序,即在主线程中运行 asyncio 并使用 run_in_executor 作为同步阻塞代码。

    【讨论】:

      猜你喜欢
      • 2019-12-18
      • 2020-04-14
      • 2020-07-03
      • 2020-10-10
      • 1970-01-01
      • 2020-12-29
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多