【问题标题】:Working of concurrent.futures.ThreadPoolExecutor max workers when scaling up the application扩展应用程序时 concurrent.futures.ThreadPoolExecutor max workers 的工作
【发布时间】:2021-06-22 05:20:11
【问题描述】:

我是 Python 编程新手。我的大部分代码都使用asyncio,因为我正在对数据库进行 IO 调用,但在某些情况下,我使用的是长时间运行的非异步方法,就像对数据库的少数 Pandas 框架调用一样,因此要避免限制可扩展性的阻塞调用,我使用concurrent.futures.ThreadPoolExecutor执行阻塞方法如下:

with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
      values = executor.map(func, data)

上面的func提供了最大长度为2的数据集合,基本上需要不超过2个线程,但是当多个用户进来,应用程序需要扩展时,那个时候最好@ 987654326@值:

  1. 是不是每个用户都需要,是2个
  2. 是否应该是最大可能值,如链接中所述 - https://docs.python.org/3/library/concurrent.futures.html

3.8 版更改:max_workers 的默认值更改为 min(32, os.cpu_count() + 4)。此默认值为 I/O 绑定任务保留至少 5 个工作人员。它使用最多 32 个 CPU 内核来执行释放 GIL 的 CPU 绑定任务。并且它避免了在多核机器上隐式使用非常大的资源。

  1. 我就不提了,可以按要求生成

要点仍然存在,如果 10 个用户开始执行相同的操作,他们最终会使用相同的 ThreadPoolExecutor(shared) 还是最终获得不同的执行程序,因为这不是共享对象。我想确保在扩大应用程序时不会因设计不正确而受到影响

【问题讨论】:

    标签: python python-3.x python-asyncio threadpoolexecutor


    【解决方案1】:

    如果你从异步代码中调用ThreadPoolExecutor,你应该使用asynciorun_in_executor函数,否则会阻塞主事件循环。

    如果额外的工作负载受 CPU 限制,那么您还应该改用 ProcessPoolExecutor

    Python 文档中的示例:

    import asyncio
    import concurrent.futures
    
    def cpu_bound():
        # CPU-bound operations will block the event loop:
        # in general it is preferable to run them in a
        # process pool.
        return sum(i * i for i in range(10 ** 7))
    
    async def main():
        loop = asyncio.get_running_loop()
    
        with concurrent.futures.ProcessPoolExecutor() as pool:
            result = await loop.run_in_executor(pool, cpu_bound)
            print('custom process pool', result)
    
    asyncio.run(main())
    

    对于max_workers,默认值通常就可以了:

    • ThreadPoolExecutor:min(32, (os.cpu_count() or 1) + 4)

    • ProcessPoolExecutor:os.cpu_count() or 1

    这取决于您的工作负载(CPU 与 I/O 限制),但对于 CPU 限制任务,没有必要将其设置为大于可用 CPU 的数字,因为它实际上可能会由于上下文切换等而降低性能。

    两个执行器都使用队列在可用线程/进程上排队和调度任务。

    更新:2021 年 3 月 25 日星期四 15:17:51 UTC

    asyncio 事件循环是单线程的,因此当您同时安排其他协程时,您会看到该问题。如您所见,none-blocking 任务被blocking executor 阻止了 10 秒:

    $ python test.py
    START none-blocking executor: (scheduled: 5.0s)
    START none-blocking: (scheduled: 1.0s)
    START blocking executor: (scheduled: 10.0s)
    END none-blocking executor: (elapsed: 5.0s)
    END blocking executor: (elapsed: 10.0s)
    END none-blocking: (elapsed: 10.0s)
    

    如果你运行了几次,blocking executor 将首先启动,none-blocking 任务甚至不会在 blocking executor 结束之前启动:

    $ python test.py
    START none-blocking executor: (scheduled: 5.0s)
    START blocking executor: (scheduled: 10.0s)
    END none-blocking executor: (elapsed: 5.0s)
    END blocking executor: (elapsed: 10.0s)
    START none-blocking: (scheduled: 1.0s)
    END none-blocking: (elapsed: 1.0s)
    

    当您注释掉blocking executor 时,您可以看到所有调用现在都是异步的:

    $ python test.py
    START none-blocking executor: (scheduled: 5.0s)
    START none-blocking: (scheduled: 1.0s)
    END none-blocking: (elapsed: 1.0s)
    END none-blocking executor: (elapsed: 5.0s)
    

    关键的一点是,一旦您开始编写异步代码,就不能将其与同步调用混为一谈。

    test.py:

    import asyncio
    import time
    
    from concurrent.futures import ThreadPoolExecutor
    
    
    def blocking(msg, t):
        t1 = time.perf_counter()
    
        print(f"START {msg}: (scheduled: {t}s)")
        time.sleep(t)
        print(f"END {msg}: (elapsed: {time.perf_counter() - t1:.1f}s)")
    
    
    async def task1(msg, t):
        t1 = time.perf_counter()
    
        print(f"START {msg}: (scheduled: {t}s)")
        await asyncio.sleep(t)
        print(f"END {msg}: (elapsed: {time.perf_counter() - t1:.1f}s)")
    
    
    async def task2(msg, t):
        with ThreadPoolExecutor() as executor:
            future = executor.submit(blocking, msg, t)
            future.result()
    
    
    async def main():
        loop = asyncio.get_running_loop()
    
        aws = [
            task1("none-blocking", 1.0),
            loop.run_in_executor(None, blocking, "none-blocking executor", 5.0),
            task2("blocking executor", 10.0),
        ]
    
        for coro in asyncio.as_completed(aws):
            await coro
    
    
    if __name__ == "__main__":
        asyncio.run(main())
    

    【讨论】:

    • 这是否意味着在两个调用之后的异步代码中是完全相同的cpu_bound() 或使用Thread Pool Executor 运行相同,映射或提交,两者都是阻塞调用,但它们的行为应该相同方式在非异步代码中。我的理解是使用线程执行器产生一个新线程来释放主 GIL 线程,否则它将在所有情况下保持阻塞
    • run_in_executor运行它的好处可能是使用为异步处理准备的事件循环,从而排队操作并增加系统可扩展性
    • @MrinalKamboj 默认情况下run_in_executor 将使用ThreadPoolExecutor,请参阅完整示例here。正如我所提到的,这取决于你如何调用ThreadPoolExecutor,如果那是来自异步代码,那么如果你不在run_in_executor 中运行它,那么在执行程序运行时没有其他协程能够运行。
    • 你说的是下面的代码不正确,@ 987654325@,但是我在运行它时找不到问题,但是感谢你的离开,让我看看各种选项/文档可用
    • @MrinalKamboj 查看我对您的评论的更新。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-01-17
    • 2017-05-08
    • 1970-01-01
    • 2015-07-06
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多