【问题标题】:Inverse of ProcessPoolExecutor in PythonPython中ProcessPoolExecutor的逆
【发布时间】:2017-12-19 23:55:04
【问题描述】:

这与我仍在努力解决的earlier problem 有关。本质上,我需要 ProcessPoolExecutor 的逆向设计,其中我有许多查询进程和一个工人,它分批计算和发回结果。

使用一个共享队列很容易发送工作项,但我仍然没有一个很好的解决方案来将所有结果发送回正确进程上的正确线程。

【问题讨论】:

    标签: python multiprocessing


    【解决方案1】:

    我认为为每个查询进程设置一个单独的multiprocessing.pipe 是最有意义的。工作进程等待任何管道上的可用项目,然后将其出列并处理它,跟踪它来自哪个管道。当需要发回数据时,它会将结果反馈到正确的管道中。

    这是一个简单的例子:

    #!/usr/bin/env python3
    
    import multiprocessing as mp
    
    def worker(pipes):
        quit = [False] * len(pipes)
        results = [''] * len(pipes)
    
        # Wait for all workers to send None before quitting
        while not all(quit):
            ready = mp.connection.wait(pipes)
            for pipe in ready:
    
                # Get index of query proc's pipe
                i = pipes.index(pipe)
    
                # Receive and "process"
                obj = pipe.recv()
                if obj is None:
                    quit[i] = True
                    continue
                result = str(obj)
                results[i] += result
    
                # Send back to query proc
                pipes[i].send(result)
        print(results)
    
    
    def query(pipe):
        for i in 'do some work':
            pipe.send(i)
            assert pipe.recv() == i
        pipe.send(None) # Send sentinel
    
    if __name__ == '__main__':
        nquery_procs = 8
        work_pipes, query_pipes = zip(*(mp.Pipe() for _ in range(nquery_procs)))
    
        query_procs = [mp.Process(target=query, args=(pipe,)) for pipe in query_pipes]
        for p in query_procs:
            p.start()
        worker(work_pipes)
        for p in query_procs:
            p.join()
    

    或者,您可以为每个查询进程提供一个 ID 号(可能只是其管道的索引),并且任何请求都必须是一个元组,即 (id_num, data)。这只是绕过在每个循环上执行pipes.index(pipe) 的工作进程,所以我不确定它能给你带来多少。

    【讨论】:

    • 谢谢!我实际上在每个进程上都有一个线程池,所以我认为我必须为每个线程制作一个管道。为每个请求(几百/秒)创建一个新的 Pipe() 并在请求管道中发送它是否有很大的开销?或者它必须是一个 Manager().Queue() 才能被挑选?在这种情况下,说 multiprocessing.Queue 是进程安全的甚至意味着什么......
    • @Akababa 您必须进行概要分析才能确定,但​​我敢打赌,为每个请求创建一个新管道是昂贵的。特别是如果每​​个线程发出多个请求,我会在开始时创建它们。
    • 快速跟进:我使用共享队列池使其工作,弹出一个并将其放入工作项中。由于“酸洗”只涉及指向共享内存的指针,这应该是有效的,对吧?
    • @Akababa 如果我理解你的评论,你的意思是你有一个队列池;一个查询过程接受一个并将其与要处理的数据一起发送;工作人员处理并将结果发送回队列;查询过程将其放回池中。如果没错,应该没问题,但是数据是否通过共享内存传输取决于您使用的 IPC 机制。如果它是multiprocessing.Pipe 对象,那么,不,它是通过操作系统管道或套接字发送的,而不是在共享内存中。酸洗将对象打包/解包为要通过此管道发送的字节。
    猜你喜欢
    • 2016-05-25
    • 2020-08-09
    • 2018-06-21
    • 2019-11-19
    • 2013-04-09
    • 1970-01-01
    • 2010-11-27
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多