【问题标题】:Parallelizing different functions at the same time in python在python中同时并行化不同的函数
【发布时间】:2016-11-04 17:20:56
【问题描述】:

我想同时执行 f1 和 f2。但是下面的代码不起作用!

from multiprocessing import Pool

def f1(x):
return x*x

def f2(x):
return x^2

if __name__ == '__main__':

    x1=10
    x2=20
    p= Pool(2)
    out=(p.map([f1, f2], [x1, x2]))

y1=out[0]
y2=out[1]

【问题讨论】:

  • 如果这是你想要的,你不能同时运行多个线程,启动一个线程本身需要一些时间。总会有时间差的,为什么不一个个一个个地启动这些线程呢?参考this docs
  • 请问您为什么要这样做?
  • @Micha90 我的函数非常耗时(在集群上运行 5 天!),所以为了节省时间,我需要并行运行它们。
  • @Jezor 如果有时间差也没关系,我只需要并行运行它们!
  • 好的,我没有时间阅读文档,但是您尝试过out[0]=p.map(f1, x1)out[1]=p.map(f2, x2) 吗?也许图书馆不能处理多个功能?只是猜测......

标签: python multithreading parallel-processing subprocess


【解决方案1】:

我想同时执行 f1 和 f2。但下面的代码不起作用! ...

out=(p.map([f1, f2], [x1, x2]))

对代码的最小更改是将 p.map() 调用替换为:

r1 = p.apply_async(f1, [x1])
out2 = f2(x2)
out1 = r1.get()

尽管如果您只想同时运行两个函数调用,那么您不需要这里的Pool(),您可以手动启动一个线程/进程和use Pipe/Queue to get the result

#!/usr/bin/env python
from multiprocessing import Process, Pipe

def another_process(f, args, conn):
    conn.send(f(*args))
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe(duplex=False)
    p = Process(target=another_process, args=(f1, [x1], child_conn))
    p.start()
    out2 = f2(x2)
    out1 = parent_conn.recv()
    p.join()

【讨论】:

    【解决方案2】:

    我相信您想在代码中使用threading.Threadshared queue

    from queue import Queue
    from threading import Thread
    import time
    
    def f1(q, x):
        # Sleep function added to compare execution times.
        time.sleep(5)
        # Instead of returning the result we put it in shared queue.
        q.put(x * 2)
    
    def f2(q, x):
        time.sleep(5)
        q.put(x ^ 2)
    
    if __name__ == '__main__':
        x1 = 10
        x2 = 20
        result_queue = Queue()
    
        # We create two threads and pass shared queue to both of them.
        t1 = Thread(target=f1, args=(result_queue, x1))
        t2 = Thread(target=f2, args=(result_queue, x2))
    
        # Starting threads...
        print("Start: %s" % time.ctime())
        t1.start()
        t2.start()
    
        # Waiting for threads to finish execution...
        t1.join()
        t2.join()
        print("End:   %s" % time.ctime())
    
        # After threads are done, we can read results from the queue.
        while not result_queue.empty():
            result = result_queue.get()
            print(result)
    

    上面的代码应该打印输出类似于:

    Start: Sat Jul  2 20:50:50 2016
    End:   Sat Jul  2 20:50:55 2016
    20
    22
    

    如您所见,即使两个函数都等待 5 秒以产生结果,但它们是并行执行的,因此总执行时间为 5 秒。

    如果您关心哪个函数将什么结果放入队列中,我可以看到两种解决方案可以确定这一点。您可以创建多个队列或将结果包装在一个元组中。

    def f1(q, x):
        time.sleep(5)
        # Tuple containing function information.
        q.put((f1, x * 2))
    

    为了进一步简化(尤其是当你有很多函数要处理时),你可以装饰你的函数(以避免重复代码并允许函数调用没有队列):

    def wrap_result(func):
        def wrapper(*args):
            # Assuming that shared queue is always the last argument.
            q = args[len(args) - 1]
            # We use it to store the results only if it was provided.
            if isinstance(q, Queue):
                function_result = func(*args[:-1])
                q.put((func, function_result))
            else:
                function_result = func(*args)
            return function_result
    
        return wrapper
    
    @wrap_result
    def f1(x):
        time.sleep(5)
        return x * 2
    

    请注意,我的装饰器是匆忙编写的,它的实现可能需要改进(例如,如果你的函数接受 kwargs)。如果您决定使用它,则必须以相反的顺序传递参数:t1 = threading.Thread(target=f1, args=(x1, result_queue))

    一点友好的建议。

    “以下代码不起作用”没有说明问题。是否引发异常?是不是产生了意想不到的结果?

    阅读错误消息很重要。更重要的是——研究它们的含义。您提供的代码会引发 TypeError 并带有非常明显的消息:

    File ".../stack.py", line 16, in <module> out = (p.map([f1, f2], [x1, x2]))

    TypeError: 'list' object is not callable

    这意味着Pool().map() 的第一个参数必须是callable 对象,例如一个函数。让我们看看该方法的文档。

    对iterable中的每个元素应用func,将结果收集在一个 返回的列表。

    它显然不允许将函数列表作为参数传递。

    Here 你可以阅读更多关于Pool().map() 方法的信息。

    【讨论】:

    • 非常感谢您的回复。这很有帮助。我想知道 f1 的计算时间是否比 f2 长,那么“结果”中的第一行将是 f2 吗?我测试了一个简单的函数,结果与函数(f1和f2)相对应。我是否也应该在代码中实现您的第二条评论?
    • 是的,值将被插入到按函数执行时间排序的队列中,这里没有什么反直觉的(函数是并行工作的,第一个完成执行的就是第一个产生结果的)。这完全取决于您的意图,如果您需要知道哪个结果对应于哪个功能,那么是的,您必须使结果与众不同。
    • 感谢您的“友好建议”,我刚刚看到了:) 我知道我提供的代码不起作用,我只是为了说明我想用我的代码做什么而编写它...您对同时调用不同函数的第一条评论确实很有帮助,但是为了解决上述问题,我实际上并没有得到确切的答案!当我在代码中添加第二条注释时,它给了我以下错误: NameError: "global name 'wrapper' is not defined" .....你能解释一下我应该如何实现其余部分吗?代码...提前致谢。
    • @Jezor 当然。但是因为我是这个网站的新手,所以我现在不能投票:(我投了赞成票,我认为一旦我获得更多积分,它将适用于未来:)
    猜你喜欢
    • 2019-07-14
    • 1970-01-01
    • 2012-09-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-05-17
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多