【问题标题】:Python Multiprocessing SchedulingPython 多进程调度
【发布时间】:2020-05-09 16:17:05
【问题描述】:

在 Python 3.6 中,我并行运行多个进程,其中每个进程 ping 一个 URL 并返回一个 Pandas 数据帧。我想继续运行(2+)个进程,我创建了一个最小的代表性示例,如下所示。

我的问题是:

1) 我的理解是,由于我有不同的功能,我不能使用Pool.map_async() 及其变体。那正确吗?我见过的唯一例子是重复相同的功能,比如this answer

2) 使此设置永久运行的最佳做法是什么?在下面的代码中,我使用了 while 循环,我怀疑它不适合此目的。

3) 我使用ProcessManager 的方式是否最佳?我使用multiprocessing.Manager.dict() 作为共享字典从进程中返回结果。我在this answer 的评论中看到,在这里使用Queue 是有意义的,但是Queue 对象没有`.dict()' 方法。所以,我不确定这将如何工作。

如果有任何改进和建议使用示例代码,我将不胜感激。

import numpy as np
import pandas as pd
import multiprocessing
import time

def worker1(name, t , seed, return_dict):
    '''worker function'''
    print(str(name) + 'is here.')
    time.sleep(t)
    np.random.seed(seed)
    df= pd.DataFrame(np.random.randint(0,1000,8).reshape(2,4), columns=list('ABCD'))
    return_dict[name] = [df.columns.tolist()] + df.values.tolist()

def worker2(name, t, seed, return_dict):
    '''worker function'''
    print(str(name) + 'is here.')
    np.random.seed(seed)
    time.sleep(t)
    df = pd.DataFrame(np.random.randint(0, 1000, 12).reshape(3, 4), columns=list('ABCD'))

    return_dict[name] = [df.columns.tolist()] + df.values.tolist()

if __name__ == '__main__':
    t=1
    while True:

        start_time = time.time()
        manager = multiprocessing.Manager()
        parallel_dict = manager.dict()
        seed=np.random.randint(0,1000,1) # send seed to worker to return a diff df
        jobs = []
        p1 = multiprocessing.Process(target=worker1, args=('name1', t, seed, parallel_dict))
        p2 = multiprocessing.Process(target=worker2, args=('name2', t, seed+1, parallel_dict))
        jobs.append(p1)
        jobs.append(p2)
        p1.start()
        p2.start()
        for proc in jobs:
            proc.join()
        parallel_end_time = time.time() - start_time
        #print(parallel_dict)
        df1= pd.DataFrame(parallel_dict['name1'][1:],columns=parallel_dict['name1'][0])
        df2 = pd.DataFrame(parallel_dict['name2'][1:], columns=parallel_dict['name2'][0])
        merged_df = pd.concat([df1,df2], axis=0)
        print(merged_df)

【问题讨论】:

    标签: python-3.x multiprocessing queue scheduled-tasks


    【解决方案1】:

    答案 1(多个功能的映射)

    您在技术上是正确的。 对于 map、map_async 和其他变体,您应该使用单个函数。

    但是这个约束可以通过实现一个执行器来绕过,并将函数作为参数的一部分传递给执行:

    def dispatcher(args):
        return args[0](*args[1:])
    

    所以一个最小的工作示例:

    import multiprocessing as mp
    
    def function_1(v):
        print("hi %s"%v)
        return 1
        
    def function_2(v):
        print("by %s"%v)
        return 2
    
    def dispatcher(args):
        return args[0](*args[1:])
    
    with mp.Pool(2) as p:
        tasks = [
            (function_1, "A"),
            (function_2, "B")
        ]
        r = p.map_async(dispatcher, tasks)
        r.wait()
        results = r.get()
    

    答案 2(调度)

    我会从脚本中删除 while 并安排一个 cron 作业 (on GNU/Linux) (on windows),以便操作系统负责它的执行。

    在 Linux 上,您可以运行 cronotab -e 并添加以下行以使脚本每 5 分钟运行一次。

    */5 * * * * python /path/to/script.py
    

    答案 3(共享字典)

    是,但不是。

    据我所知,使用管理器处理集合等数据是最好的方法。 对于数组或原始类型(int、floats、ecc)存在 ValueArray which are faster

    documentation

    Manager() 返回的管理器对象控制着一个服务器进程,该服务器进程包含 > Python 对象并允许其他进程使用代理来操作它们。

    Manager() 返回的管理器将支持类型 list、dict、Namespace、Lock、> RLock、Semaphore、BoundedSemaphore、Condition、Event、Barrier、Queue、Value 和 > Array。

    服务器进程管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理器可以由网络上不同计算机上的进程共享。但是,它们比使用共享内存要慢。

    但您只需返回一个 Dataframe,因此不需要共享字典。

    清理代码

    使用之前的所有想法,代码可以重写为:

    地图版本

    import numpy as np
    import pandas as pd
    from time import sleep
    import multiprocessing as mp
    
    def worker1(t , seed):
        print('worker1 is here.')
        sleep(t)
        np.random.seed(seed)
        return pd.DataFrame(np.random.randint(0,1000,8).reshape(2,4), columns=list('ABCD'))
         
    
    def worker2(t , seed):
        print('worker2 is here.')
        sleep(t)
        np.random.seed(seed)
        return pd.DataFrame(np.random.randint(0, 1000, 12).reshape(3, 4), columns=list('ABCD'))
    
    def dispatcher(args):
        return args[0](*args[1:])
    
    def task_generator(sleep_time=1):
        seed = np.random.randint(0,1000,1)
        yield worker1, sleep_time, seed    
        yield worker2, sleep_time, seed + 1
    
    with mp.Pool(2) as p:
        results = p.map(dispatcher, task_generator())
        merged = pd.concat(results, axis=0)
        print(merged)
    

    如果 Dataframe 的串联过程是瓶颈,则使用 imap 的方法可能会变得最优。

    imap 版本

    with mp.Pool(2) as p:
        merged = pd.DataFrame()
        for result in p.imap_unordered(dispatcher, task_generator()):
            merged = pd.concat([merged,result], axis=0)
        print(merged)
    

    主要区别在于,在map的情况下,程序先等待所有的流程任务结束,然后再拼接所有的Dataframe。

    在 imap_unoredered 情况下,一旦任务结束,Dataframe 就会连接到当前结果。

    【讨论】:

    • 非常感谢。为了让worker1`和worker2并行运行,我是否不必使用map_async()而不是map?你使用map()是有原因的吗?
    • 没问题! map 将任务分配给 Pool 上的所有进程。所以是平行的!不同之处在于 map 是阻塞的,而 map_async 是非阻塞的。因此,使用 map 您必须等待所有任务完成,而使用 map_async 您将获得一些结果对象,然后您必须调用 result.wait() 和 result.get()。
    猜你喜欢
    • 1970-01-01
    • 2012-06-21
    • 1970-01-01
    • 1970-01-01
    • 2011-05-14
    • 1970-01-01
    • 1970-01-01
    • 2017-03-24
    • 1970-01-01
    相关资源
    最近更新 更多