【问题标题】:Pool results with manage and re-start process from multiprocessing通过多处理管理和重新启动进程来汇集结果
【发布时间】:2020-07-20 07:14:22
【问题描述】:

假设我有 500 个查询要做,使用 4 个进程。我想将查询平均分配给每个进程,然后汇集来自这些查询的结果,然后使用结果重新启动进程:

我的伪代码是:

def somefunc(subqueries, dict1, dict2):
    # do something to subqueries and then add the results to dict1 and dict2

queries = [......]
iteration = 5
manager = mp.Manager()
dict1 = manager.dict()
dict2 = manager.dict()
while iteration > 0:
    
    manager = mp.Manager()
    nextIteration = manager.list()
    redistributedQueries = redistribute(queries)

    p1 = Process(target = some_func(queries[0], dict1, dict2, nextIteration))
    p1.start(); p1.join()
    p2 = Process(target = some_func(queries[1], dict1, dict2, nextIteration))
    p2.start(); p2.join()
    p3 = Process(target = some_func(queries[2], dict1, dict2, nextIteration))
    p3.start(); p3.join()
    p4 = Process(target = some_func(queries[3], dict1, dict2, nextIteration))
    p4.start(); p4.join()

    #get results from p1, p2, p3, p4, and then append them into a list
    queries = resultsFromPreviousProcesses
    iteration -= 1

我的问题是,杀死经理然后重新实例化它是否有效(需要使用经理,因为结果将被共享?

【问题讨论】:

    标签: python multiprocessing distributed-computing


    【解决方案1】:

    我会这样重写:在循环中调用线程函数,如果函数返回一些代码检查它然后中断该循环并保存线程输出创建一个全局数组,其中子数组设置为输出,例如:

    global out
    out.append(this_threads_output)
    

    在线程外访问变量:

    out[thread_number]# returns whole array
    out[thread_number][value_number]# returns 1 value from your array
    

    btw thread_number 就像您启动的第一个线程将是 0 第二个线程 = 1 等等.....

    这里有一个示例程序

    from _thread import *
    
    out = []
    
    def function(inputs): # replace inputs with your real threaded function inputs
        global out
    
        temp = [5, 3, 4] # replace with you functions code
    
        out.append(temp)
        return
    
    while True:
        _thread.start_new_thread(function, (inputs)) # replace inputs with your real threaded function inputs
        #!!!PSEUDO CODE FOR NOW!!!
       when all threads != active then break
    #!!!NOT PSEUDO CODE ANYMORE
    print(out[0]) #0 represents thread 1; replace with you use
    

    【讨论】:

    • 拆分的事情就像总/你想要多少进程一样
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-09-08
    • 2020-12-05
    • 1970-01-01
    相关资源
    最近更新 更多