【问题标题】:How to write a multithreaded function for processing different tasks concurrently?如何编写一个多线程函数来同时处理不同的任务?
【发布时间】:2018-10-25 08:49:43
【问题描述】:

我想在 python 中定义一个do_in_parallel 函数,它将接收带参数的函数,为每个函数创建一个线程并并行执行它们。该函数应该这样工作:

do_in_parallel(_sleep(3), _sleep(8), _sleep(3))

然而,我很难定义 do_in_parallel 函数来接受多个函数,每个函数都有多个参数,这是我的尝试:

from time import sleep
import threading

def do_in_parallel(*kwargs):

    tasks = []

    for func in kwargs.keys():
        t = threading.Thread(target=func, args=(arg for arg in kwargs[func]))
        t.start()
        tasks.append(t)

    for task in tasks:        
        task.join()

def _sleep(n):
    sleep(n)
    print('slept', n)

这样使用它,并得到以下错误:

do_in_parallel(_sleep=3, _sleep=8, _sleep=3)

>> do_in_parallel(sleepX=3, sleepX=8, sleepX=3)
                            ^
>> SyntaxError: keyword argument repeated

有人可以解释一下我需要在我的函数中进行哪些更改,以便它可以采用多个函数参数:

do_in_parallel(_sleep(3), _sleep(8), maybe_do_something(else, and_else))

【问题讨论】:

  • @VineethSai 嗯...只有当我将 args 部分从线程中取出,但它不会并行执行它们
  • list代替**kwargs**,除了这个一个for <args>就够了。
  • @stovfl 你是什么意思'除了这个 就足够了'
  • 你在做for func in kwargs.keys():,在for arg in kwargs[func]里面。更改为args=kwargs[func]

标签: python multithreading function concurrency arguments


【解决方案1】:

do_in_parallel(_sleep(3), _sleep(8), maybe_do_something(else, and_else))

这个调用结构无论如何都不起作用,因为您将目标函数的结果传递给do_in_parallel(您已经在调用 _sleep 等)。

您需要做的是捆绑任务并将这些任务传递给您的处理函数。这里的任务是一个元组,包含要调用的目标函数和一个参数元组task = (_sleep, (n,))

我建议您然后使用 ThreadPool 和 apply_async 方法来处理单独的任务。

from time import sleep
from multiprocessing.dummy import Pool  # .dummy.Pool is a ThreadPool


def _sleep(n):
    sleep(n)
    result = f'slept {n}'
    print(result)
    return result


def _add(a, b):
    result = a + b
    print(result)
    return result


def do_threaded(tasks):
    with Pool(len(tasks)) as pool:
        results = [pool.apply_async(*t) for t in tasks]
        results = [res.get() for res in results]
    return results


if __name__ == '__main__':

    tasks = [(_sleep, (i,)) for i in [3, 8, 3]]
    # [(<function _sleep at 0x7f035f844ea0>, (3,)),
    #  (<function _sleep at 0x7f035f844ea0>, (8,)),
    #  (<function _sleep at 0x7f035f844ea0>, (3,))]
    tasks += [(_add, (a, b)) for a, b in zip(range(0, 3), range(10, 13))]

    print(do_threaded(tasks))

输出:

10
12
14
slept 3
slept 3
slept 8
['slept 3', 'slept 8', 'slept 3', 10, 12, 14]

Process finished with exit code 0

【讨论】:

  • 只是确保“from multiprocessing.dummy import Pool”这是多线程,虽然它来自多处理包?
  • @callmeGuy 完全正确。它是“虚拟的”,因为它使用线程而不是进程,但在其他方面提供了相同的方法。它是multiprocessing.pool.ThreadPool 的包装器,如果您愿意,可以直接导入后者。
  • 这很酷,谢谢。你认为我可以添加读写 pandas df 的函数吗?比如我需要什么锁吗?
  • @callmeGuy 如果您需要锁来保持数据一致取决于您正在做什么,这不是一般情况下可以回答的问题。如果您考虑到这一点,请记住,在执行非原子操作时,线程可以随时中断。因此,在您需要事务不可中断的情况下,您将需要锁定。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-01-10
相关资源
最近更新 更多