【问题标题】:Python asyncio and threading are not working parallelPython asyncio 和线程不能并行工作
【发布时间】:2021-05-27 00:24:07
【问题描述】:

我有以下代码,用于测试目的。

这只是一个调用“5000x factorial(900)”并打印输出的函数

我使用线程还是异步都没关系,它们总是在另一个函数之后运行一个函数,从不并行。

第一个使用 asyncio:

import asyncio
async def factorial(name, number):
    def fatorial(n):
        if n == 1:
            return 1
        else:
            return n * fatorial(n - 1)
    print(f"START: Task {name}: factorial({number})")
    for i in range(5000):
        var = fatorial(number)
    print(f"FIM: Task {name}: factorial({number})")
    return var

async def main():
    task1 = asyncio.ensure_future(factorial("A", 900))
    task2 = asyncio.ensure_future(factorial("B", 900))
    task3 = asyncio.ensure_future(factorial("C", 900))
    task4 = asyncio.ensure_future(factorial("D", 900))
    await asyncio.gather(task1, task2, task3, task4)


asyncio.run(main())

也试过了:

async def main():
    # Schedule three calls *concurrently*:
    task1 = asyncio.create_task(factorial("A", 900))
    task2 = asyncio.create_task(factorial("B", 900))
    task3 = asyncio.create_task(factorial("C", 900))
    task4 = asyncio.create_task(factorial("D", 900))
    await task4


asyncio.run(main())

还尝试了线程:

import threading
def factorial(name, number):
    def fatorial(n):
        if n == 1:
            return 1
        else:
            return n * fatorial(n - 1)
    print(f"START: Task {name}: factorial({number})")
    for i in range(5000):
        var = fatorial(number)
    print(f"FIM: Task {name}: factorial({number})")


threading.Thread(target=factorial("A", 900), daemon=True).start()
threading.Thread(target=factorial("B", 900), daemon=True).start()
threading.Thread(target=factorial("C", 900), daemon=True).start()
threading.Thread(target=factorial("D", 900), daemon=True).start()

而且输出总是一样的:

START: Task A: factorial(900)
FIM: Task A: factorial(900)
START: Task B: factorial(900)
FIM: Task B: factorial(900)
START: Task C: factorial(900)
FIM: Task C: factorial(900)
START: Task D: factorial(900)
FIM: Task D: factorial(900)

【问题讨论】:

  • 异步和线程都不适合 CPU 密集型工作。你最好使用 multiprocessing 或 concurrent.futures.ProcessorPoolExecutor。
  • 谢谢@dirn,但遗憾的是它也没有用。我尝试了这两个选项,总是相同的结果

标签: python python-3.x python-asyncio python-multithreading


【解决方案1】:

正如 cmets 中所说,这些都不适用于 CPU 密集型工作 - 但是是的,它们可以在一定程度上并行运行 - 与按顺序运行相比,它只需要一些时间,或者多一点。您的代码在这两种情况下都不正确

对于 asyncio 位:asyncio 只是在遇到 await 指令或等效指令时停止一个任务以运行另一个任务。没有“等待”,您的代码直接运行直到完成,没有任务切换的机会。

awaits 在代码中自然而然地出现在最适合作为异步运行的代码中,因为它们被放置在对 I/O 的调用之前,这应该需要一些时间(您等待对 db 服务器的查询或 http 请求, 例如)。 在像这样的 CPU 绑定循环中,没有什么可等待的,所以,如果你想变得更好,让其他代码并行运行,你必须引入那些“漏洞”。一种方法是等待致电asyncio.sleep。 如果您在内部阶乘的末尾放置一个这样的调用,您应该会看到它并行运行:

import asyncio
async def factorial(name, number):
    async def fatorial(n):
        await asyncio.sleep(0)
        if n == 1:
            return 1
        else:
            return n * fatorial(n - 1)
    print(f"START: Task {name}: factorial({number})")
    for i in range(5000):
        var = await fatorial(number)
    print(f"FIM: Task {name}: factorial({number})")
    return var



在带螺纹的情况下,有一个不同的错误。与async def 声明的函数不同,当您创建一个以函数为目标的线程时,您不能调用该函数。您只需将函数作为对象传递给线程构造函数,并将参数分别传递给每个对象。实际的函数调用将在线程内部进行。按照您的操作方式,您只是在调用创建每个线程之前急切地调用了所有函数(调用threading.Thread(...) 中括号内的所有内容都必须在调用之前执行。

“async def functions”(正确的名称是“coroutine function”)的行为是不同的:调用语法(例如 factorial() )解析,但这不运行任何代码 - 相反,调用执行 corotine 函数产生一个协程对象 - 然后可以等待或包装在任务或未来中:只有它们才能实际执行函数体中的代码。

因此,对于线程代码,更改如下:


threading.Thread(target=factorial, args=("A", 900), daemon=True).start()
threading.Thread(target=factorial, args=("B", 900), daemon=True).start()
threading.Thread(target=factorial, args=("C", 900), daemon=True).start()
threading.Thread(target=factorial, args=("D", 900), daemon=True).start()

现在,如果您有一台多核机器,您可以将线程示例更改为多处理示例,在创建 Process 实例时同样小心,您应该会看到执行时间与物理 CPU 的数量成正比你拥有的核心。 (最多 4 个,因为您只创建 4 个并行任务)

【讨论】:

    【解决方案2】:

    如果您简化内部函数,则可以更轻松地查看正在发生的事情。像这样。

    import asyncio
    import time
    
    async def factorial(name, number):
        print(f"START: Task {name}: factorial({number})")
        if True:  # or False - change manually
            await asyncio.sleep(1.0)
        else:
            time.sleep(1.0)
        print(f"FIM: Task {name}: factorial({number})")
    
    async def main():
        task1 = asyncio.ensure_future(factorial("A", 900))
        task2 = asyncio.ensure_future(factorial("B", 900))
        task3 = asyncio.ensure_future(factorial("C", 900))
        task4 = asyncio.ensure_future(factorial("D", 900))
        await asyncio.gather(task1, task2, task3, task4)
    
    asyncio.run(main())
    

    True(且不受 CPU 限制)时

    START: Task A: factorial(900)
    START: Task B: factorial(900)
    START: Task C: factorial(900)
    START: Task D: factorial(900)
    FIM: Task A: factorial(900)
    FIM: Task C: factorial(900)
    FIM: Task B: factorial(900)
    FIM: Task D: factorial(900)
    

    False(也不受CPU限制)

    START: Task A: factorial(900)
    FIM: Task A: factorial(900)
    START: Task B: factorial(900)
    FIM: Task B: factorial(900)
    START: Task C: factorial(900)
    FIM: Task C: factorial(900)
    START: Task D: factorial(900)
    FIM: Task D: factorial(900)
    

    不同之处在于等待 asyncio.sleep 将控制权交还给事件循环。调用 time.sleep 不会。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2017-05-08
      • 2019-06-18
      • 1970-01-01
      • 1970-01-01
      • 2019-01-14
      • 1970-01-01
      • 2019-07-28
      相关资源
      最近更新 更多