【问题标题】:How to use multiprocessing to parallelize two calls to the same function, with different arguments, in a for loop?如何在 for 循环中使用多处理并行化对具有不同参数的同一函数的两次调用?
【发布时间】:2018-03-07 06:01:52
【问题描述】:

在 for 循环中,我调用一个函数两次,但使用不同的参数集(argSet1argSet2在 for 循环的每次迭代中都会发生变化。我想并行化这个操作,因为一组参数导致被调用函数运行得更快,而另一组参数导致函数运行缓慢。请注意,我不想为此操作使用两个 for 循环。我还有另一个要求:这些函数中的每一个都将执行一些并行操作,因此我不希望 argSet1argSet2 的任何函数运行不止一次,因为我计算的资源有限有。确保具有两个参数集的函数都在运行将帮助我尽可能多地利用 CPU 内核。以下是在没有并行化的情况下正常执行的方法:

def myFunc(arg1, arg2):
    if arg1:
        print ('do something that does not take too long')
    else:
       print ('do something that takes long')

for i in range(10):
    argSet1 = arg1Storage[i]
    argSet1 = arg2Storage[i]
    myFunc(argSet1)
    myFunc(argSet2)

这绝对不会利用我拥有的计算资源。这是我尝试并行化操作:

from multiprocessing import Process

def myFunc(arg1, arg2):
    if arg1:
        print ('do something that does not take too long')
    else:
       print ('do something that takes long')

for i in range(10):
    argSet1 = arg1Storage[i]
    argSet1 = arg2Storage[i]
    p1 = Process(target=myFunc, args=argSet1)
    p1.start()
    p2 = Process(target=myFunc, args=argSet2)
    p2.start()

但是,这样每个函数及其各自的参数将被调用 10 次,并且事情变得非常缓慢。鉴于我对多处理的了解有限,我试图通过在 for 循环的末尾添加 p1.join()p2.join() 来进一步改进,但这仍然会导致速度变慢,因为 p1 完成得更快,事情要等到 @ 987654330@ 已完成。我还考虑过使用multiprocessing.Value 与函数进行一些通信,但是我必须在函数内为每个函数调用添加一个while 循环,这会再次减慢一切。我想知道是否有人可以提供一个实用的解决方案?

【问题讨论】:

  • 如果你只想调用一次,为什么要在每个 argSet 的 for 循环中调用 myFunc 10 次?
  • @dnswlt 对不起,我忘了提到每次迭代的参数都会改变。现在更新我的问题
  • 你应该使用一个进程池(不管你想要多少),并在开始时提交所有作业,并让它们全部运行到完成。这样您就可以在整个运行过程中利用所有可用的内核,而目前两个并行作业中较快的一个将首先完成,然后该内核上不会发生任何事情。
  • @JohnZwinck 谢谢,但请您尝试发布答案吗?我对多处理还很陌生,不清楚如何正确使用它。
  • 我认为您对需求的定义不够明确。 1.您不想并行化所有运行 2.您不想一次仅并行化 2 个调用(您的最后一次尝试)。所以你想要什么?一次并行化所有 10 个快速调用,然后依次运行慢速调用?在顺序运行的同时运行慢速调用并行运行快速运行?这种情况下,当慢跑结束时,要不要进入下一个慢跑,仍然并行快速运行?

标签: python python-3.x ubuntu for-loop multiprocessing


【解决方案1】:

因为我在补丁中构建了这个答案,所以向下滚动以获得解决这个问题的最佳方法

您需要准确地指定您希望事情如何运行。据我所知,您最多希望两个进程运行,但也至少要运行。此外,您不希望繁重的呼叫阻碍快速呼叫。一种简单的非最佳运行方式是:

from multiprocessing import Process

def func(counter,somearg):
    j = 0
    for i in range(counter): j+=i
    print(somearg)

def loop(counter,arglist):
    for i in range(10):
        func(counter,arglist[i])

heavy = Process(target=loop,args=[1000000,['heavy'+str(i) for i in range(10)]])
light = Process(target=loop,args=[500000,['light'+str(i) for i in range(10)]])
heavy.start()
light.start()
heavy.join()
light.join()

这里的输出是(对于一个示例运行):

light0
heavy0
light1
light2
heavy1
light3
light4
heavy2
light5
light6
heavy3
light7
light8
heavy4
light9
heavy5
heavy6
heavy7
heavy8
heavy9

您可以看到最后一部分是次优的,因为您有一系列繁重的运行 - 这意味着有一个进程而不是两个。

如果您可以估计繁重的进程运行了多长时间,则可以轻松地对此进行优化。如果它比这里慢一倍,只需先运行 7 次重迭代,加入轻进程,然后让它运行额外的 3 次。

另一种方法是成对运行繁重的进程,所以首先你有3个进程,直到快速进程结束,然后继续2个。

重点是将重调用和轻调用完全分离到另一个进程 - 因此,当快速调用一个接一个地快速完成时,您可以处理慢的东西。一旦快速结束,您要继续进行多详细的操作取决于您,但我认为现在估计如何分解繁重的呼叫就足够了。这是我的例子:

from multiprocessing import Process

def func(counter,somearg):
    j = 0
    for i in range(counter): j+=i
    print(somearg)

def loop(counter,amount,arglist):
    for i in range(amount):
        func(counter,arglist[i])

heavy1 = Process(target=loop,args=[1000000,7,['heavy1'+str(i) for i in range(7)]])
light = Process(target=loop,args=[500000,10,['light'+str(i) for i in range(10)]])
heavy2 = Process(target=loop,args=[1000000,3,['heavy2'+str(i) for i in range(7,10)]])
heavy1.start()
light.start()
light.join()
heavy2.start()
heavy1.join()
heavy2.join()

有输出:

light0
heavy10
light1
light2
heavy11
light3
light4
heavy12
light5
light6
heavy13
light7
light8
heavy14
light9
heavy15
heavy27
heavy16
heavy28
heavy29

更好的利用率。您当然可以通过为慢速进程运行共享一个队列来使其更高级,因此当快速完成后,他们可以作为工作人员加入慢速队列,但是对于只有两个不同的调用,这可能是矫枉过正(尽管使用起来并不难queue)。 最佳解决方案

from multiprocessing import Queue,Process
import queue

def func(index,counter,somearg):
    j = 0
    for i in range(counter): j+=i
    print("Worker",index,':',somearg)

def worker(index):
    try:
        while True:
            func,args = q.get(block=False)
            func(index,*args)
    except queue.Empty: pass

q = Queue()
for i in range(10):
    q.put((func,(500000,'light'+str(i))))
    q.put((func,(1000000,'heavy'+str(i))))

nworkers = 2
workers = []
for i in range(nworkers):
    workers.append(Process(target=worker,args=(i,)))
    workers[-1].start()
q.close()
for worker in workers:
    worker.join()

这是您想要的最佳和最具可扩展性的解决方案。输出:

Worker 0 : light0
Worker 0 : light1
Worker 1 : heavy0
Worker 1 : light2
Worker 0 : heavy1
Worker 0 : light3
Worker 1 : heavy2
Worker 1 : light4
Worker 0 : heavy3
Worker 0 : light5
Worker 1 : heavy4
Worker 1 : light6
Worker 0 : heavy5
Worker 0 : light7
Worker 1 : heavy6
Worker 1 : light8
Worker 0 : heavy7
Worker 0 : light9
Worker 1 : heavy8
Worker 0 : heavy9

【讨论】:

  • 很好的答案!谢谢!我认为Queue 解决方案更适用于我的工作。
  • @Amir,这是“正确”的解决方案。并且会更好地适应不同数量的工作和工人。当您只想将工作“喂”给“工人”时,这是经典的工作模式。
  • 我刚刚意识到所有这些方法都存在问题。我发现我正在运行的函数是类中的函数 (myClass.myFunc)。在__init__(self) 类中,我定义了一些变量(想象它们就像计数器)。然后内部函数(我想并行化)更新这些变量,我想在myClass.myFunc 调用之外使用它们。但是,由于Process 以某种方式创建了自己的环境,因此我不能在类之外使用这些变量。很抱歉增加了问题的复杂性,但有解决方案吗?
  • 它不必使用多处理,但我只是不想使用线程,因为线程不会使用所有可用的内核(据我所知)。
  • @Amir 您不断向问题添加新要求,这正在演变为聊天。 “里面”是什么意思?我建议这个 - 你解决了你的工作模式,现在你想问一下共享内存。写一个只涉及两个进程的新问题,以及您希望它们如何共享内存(因此简化我们在此处解决的硬/轻进程问题) - 将其链接到此处,我会看看。
【解决方案2】:

您可能想要使用multiprocessing.Pool 进程并将您的myFunc 映射到其中,如下所示:

from multiprocessing import Pool
import time

def myFunc(arg1, arg2):
    if arg1:
        print ('do something that does not take too long')
        time.sleep(0.01)
    else:
       print ('do something that takes long')
       time.sleep(1)

def wrap(args):
    return myFunc(*args)

if __name__ == "__main__":
    p = Pool()
    argStorage = [(True, False), (False, True)] * 12
    p.map(wrap, argStorage)

我添加了一个wrap 函数,因为传递给p.map 的函数必须接受一个参数。如果您的情况可能,您也可以调整 myFunc 以接受元组。

我的示例appStorage 包含 24 个项目,其中 12 个将需要 1 秒处理,12 个将在 10 毫秒内完成。总的来说,这个脚本在 3-4 秒内运行(我有 4 个核心)。

【讨论】:

    【解决方案3】:

    一种可能的实现方式如下:

    import concurrent.futures
    import math
    
    list_of_args = [arg1, arg2]
    
    def my_func(arg):
        ....
        print ('do something that takes long')
    
    def main():
        with concurrent.futures.ProcessPoolExecutor() as executor:
            for arg, result in zip(list_of_args, executor.map(is_prime, list_of_args)):
                print('my_func({0}) => {1}'.format(arg, result))
    

    executor.map 就像内置函数,map 方法允许多次调用提供的函数,将迭代中的每个项目传递给该函数。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-12-08
      • 1970-01-01
      • 2014-02-05
      • 1970-01-01
      • 2020-01-22
      相关资源
      最近更新 更多