【问题标题】:Python asyncio in a thread for migrating existing codebase用于迁移现有代码库的线程中的 Python asyncio
【发布时间】:2020-08-10 15:12:34
【问题描述】:

我们有一个相当大的项目,它正在做大量的网络(API 调用、Websocket 消息),并且还有很多在线程中间隔运行的内部作业。我们当前的架构涉及产生大量线程,并且当系统处于大负载下时应用程序无法正常运行,因此我们决定尝试 asyncio。

我知道最好的方法是将整个代码库迁移到异步代码,但由于代码库的大小和有限的开发资源,这在不久的将来是不现实的。但是,我们希望开始迁移部分代码库以使用 asyncio 事件循环,并希望我们能够在某个时候转换整个项目。

到目前为止我们遇到的问题是整个代码库都有同步代码,为了在里面添加非阻塞的异步代码,代码需要在不同的线程中运行,因为你不能真正运行异步和同步代码在同一个线程中。

为了结合异步和同步代码,我想出了这种在应用启动时创建的单独线程中运行异步代码的方法。代码的其他部分只需调用 add_asyncio_task 即可将作业添加到此循环中。

import threading
import asyncio
_tasks = []

def threaded_loop(loop):
    asyncio.set_event_loop(loop)
    global _tasks
    while True:
        if len(_tasks) > 0:
            # create a copy of needed tasks
            needed_tasks = _tasks.copy()
            # flush current tasks so that next tasks can be easily added
            _tasks = []
            # run tasks
            task_group = asyncio.gather(*needed_tasks)
            loop.run_until_complete(task_group)


def add_asyncio_task(task):
    _tasks.append(task)

def start_asyncio_loop():
    loop = asyncio.get_event_loop()
    t = threading.Thread(target=threaded_loop, args=(loop,))
    t.start()

在 app.py 中的某处:

start_asyncio_loop()

以及代码中的其他任何地方:

add_asyncio_task(some_coroutine)

由于我是 asyncio 的新手,我想知道在我们的情况下这是否是一种好方法,或者这种方法是否被认为是一种反模式并且会在以后遇到一些问题?或者也许 asyncio 已经为此提供了一些解决方案,而我只是想在这里发明轮子?

感谢您的意见!

【问题讨论】:

    标签: python multithreading concurrency python-asyncio


    【解决方案1】:

    一般来说,这种方法很好。不过你有一些问题:

    (1)Almost all asyncio objects are not thread safe

    (2) 您的代码本身不是线程安全的。如果任务出现在needed_tasks = _tasks.copy() 之后但在_tasks = [] 之前怎么办?你在这里需要一把锁。顺便说一句,复制是没有意义的。简单的needed_tasks = _tasks 就可以了。

    (3) 一些 asyncio 结构是线程安全的。使用它们:

    import threading
    import asyncio
    
    # asyncio.get_event_loop() creates a new loop per thread. Keep
    # a single reference to the main loop. You can even try
    #   _loop = asyncio.new_event_loop()
    _loop = asyncio.get_event_loop()
    
    def get_app_loop():
        return _loop
    
    def asyncio_thread():
        loop = get_app_loop()
        asyncio.set_event_loop(loop)
        loop.run_forever()
    
    def add_asyncio_task(task):
        asyncio.run_coroutine_threadsafe(task, get_app_loop())
    
    def start_asyncio_loop():
        t = threading.Thread(target=asyncio_thread)
        t.start()
    

    【讨论】:

    • 非常感谢,这些都是非常好的点。我将循环从主线程传递到新创建的线程,以便在代码的其他部分引用此循环。例如,如果我需要将它传递给第三方库,以便一切都发生在同一个循环中。你认为这是个好主意吗?我同意代码不是线程安全的。感谢您指出这一点并提供解决方案!
    • @Prank100 如果您不打算使用多个循环(为什么会这样?这种要求非常罕见),那么请坚持使用全局 asyncio.get_event_loop()
    • 知道了,谢谢。我是否正确理解当你从不同的线程执行 asyncio.get_event_loop() 时,你会得到不同的循环?还是会是同一个?当前的应用程序严重依赖线程,我担心我会错误地有多个循环
    • @Prank100 哎呀,你是对的。出于某种原因,我认为get_event_loop() 总是指同一个循环。它确实在不同的线程中创建了一个新循环(我刚刚检查了源代码)。在那种情况下,我认为最好使用对循环的全局引用。我已经更新了代码。
    • @Prank100 我会进一步建议add_asyncio_task 返回 run_coroutine_threadsafe 返回的对象。该对象是concurrent.futures.Future,可用于(同步)等待协程完成,并获取其结果以及引发的异常(如果有)。
    猜你喜欢
    • 2018-02-10
    • 1970-01-01
    • 2010-10-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-09-04
    相关资源
    最近更新 更多