【问题标题】:Multiprocessing different rows of matrix多处理不同的矩阵行
【发布时间】:2021-12-20 06:09:14
【问题描述】:

我有一个非常大的矩阵(超过 10 万乘 10 万),其计算逻辑可以使每一行与其他行不同

我想使用多处理来优化计算时间(矩阵分成 3 个切片,每个切片 1/3 行)。然而,似乎多处理需要比单个调用更长的时间来计算所有行。我在每个过程中更改矩阵的不同部分 - 这是问题所在吗?

import multiprocessing, os
import time, pandas as pd, numpy as np

def mat_proc(df):
    print("ID of process running worker1: {}".format(os.getpid()))
    return(df+3)  # simplified version of process  
    print('done processing')
          
count=5000

df = pd.DataFrame(np.random.randint(0,10,size=(3*count,3*count)),dtype='int8')
slice1=df.iloc[0:count,]
slice2=df.iloc[count:2*count,]
slice3=df.iloc[2*count:3*count,]

p1=multiprocessing.Process(target=mat_proc,args=(slice1,))
p2=multiprocessing.Process(target=mat_proc,args=(slice2,))
p3=multiprocessing.Process(target=mat_proc,args=(slice3,))

start=time.time()
print('started now')
# this is to compare the multiprocess with a single call to full matrix
#mat_proc(df)

if __name__ == '__main__':   
    p1.start()
    p2.start()
    p3.start()
    p1.join()
    p2.join()
    p3.join()
    
finish=time.time()
print(f'total time taken {round(finish-start,2)}')

【问题讨论】:

    标签: python pandas dataframe multiprocessing


    【解决方案1】:

    生成进程是一项昂贵的操作。如果您没有在新进程中执行使进程生成时间看起来可以忽略不计的任务,那么最好坚持一个进程。

    另一种选择是使用多线程,它的成本低于多处理。您必须根据数据规模和总处理时间来决定使用哪一个。

    This article 很好地解释了差异和成本。看看吧!

    另外,使用 multiprocessing.pool.Pool 和 multiprocessing.pool.ThreadPool 会更干净。查看下面的示例和official doc 以了解它们的用法。

    from multithreading.pool import Pool, ThreadPool
    
    
    def run_parallel(kls):
        with kls() as pool:
            return pool.map(mat_proc, [df.iloc[0:count,], df.iloc[count: 2 * count, ], df.iloc[2 * count: 3 * count, ]])
    
    
    run_parallel(Pool)        # Run with multiprocessing
    run_parallel(ThreadPool)  # Run with multithreading
    

    【讨论】:

    • 通过使用多线程,他不会看到任何性能提升(在这种情况下),我认为选择是多处理或单进程(可能有一些numba 来加快速度并使用并行性)
    • 为什么不呢?你指的是 GIL 吗?
    • 是的,线程在执行一些 I/O 绑定任务时会加快速度,我认为这里不是这种情况。
    【解决方案2】:

    使用多处理时,将所有脚本部分移动到 if __name__ == '__main__' 部分。因为当每个进程产生时,它都会运行您的主脚本。所以每个进程都必须重新创建数据框、切片等。

    import multiprocessing, os
    import time, pandas as pd, numpy as np
    
    
    def mat_proc(df):
        print("ID of process running worker1: {}".format(os.getpid()))
        return (df + 3)  # simplified version of process
        print('done processing')
    
    
    if __name__ == '__main__':
        count = 5000
    
        df = pd.DataFrame(np.random.randint(0, 10, size=(3 * count, 3 * count)), dtype='int8')
        slice1 = df.iloc[0:count, ]
        slice2 = df.iloc[count:2 * count, ]
        slice3 = df.iloc[2 * count:3 * count, ]
    
        p1 = multiprocessing.Process(target=mat_proc, args=(slice1,))
        p2 = multiprocessing.Process(target=mat_proc, args=(slice2,))
        p3 = multiprocessing.Process(target=mat_proc, args=(slice3,))
    
        start = time.time()
        print('started now')
        # this is to compare the multiprocess with a single call to full matrix
        # mat_proc(df)
    
        p1.start()
        p2.start()
        p3.start()
        p1.join()
        p2.join()
        p3.join()
    
        finish = time.time()
        print(f'total time taken {round(finish - start, 2)}')
    

    并考虑使用multiprocessing.Pool,它可以很方便地通过更改单个数字来选择要生成的进程数。

    第二件事,如果计算很容易(如您提供的流程的简化版本)生成流程,向其发送数据(腌制和取消腌制数据帧)将比这些计算花费更长的时间,并且多处理会更慢。

    【讨论】:

    • 这不是真的。子流程执行从您传递给 Process 构造函数的目标开始。在这种情况下,它的 mat_proc 函数。老实说,这没什么区别。
    • @Crash0v3rrid3 好的,发现这是仅限 Windows 的东西,on Windows it makes a difference。答案的第二部分对 Windows 和 Unix 都有好处 - 如果 OP 只需执行快速操作,多处理将在酸洗输入和输出数据帧方面遇到瓶颈。
    • 只有托管数据结构使用酸洗进行数据交换。在基于 unix 的机器中,当使用 fork 生成新进程时,操作系统会确保复制内存空间(它使用写时复制来提高性能)。所以,这不是什么瓶颈,因为他没有执行写入操作。
    • @Crash0v3rrid3 感谢这些 cmets。 managed datastructures 是什么意思?我不确定 OP 是否没有执行写入(“我在每个进程中更改矩阵的不同部分”)
    • IPC 队列(docs.python.org/3/library/…),例如。
    猜你喜欢
    • 2022-12-01
    • 2014-04-03
    • 1970-01-01
    • 1970-01-01
    • 2020-08-01
    • 2016-06-10
    • 2021-10-09
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多