【问题标题】:Multiprocessing is too slow in multiple loops in PythonPython中的多个循环中的多处理太慢了
【发布时间】:2021-12-15 05:20:58
【问题描述】:

我正在尝试将多循环函数转换为多处理函数以利用多处理。

初始代码如下所示:

from tqdm import tqdm 
import re
import re, string

def transform_data(sentence, aug):
    # some processing from different module function
    out = [re.sub('[%s]' % re.escape(string.punctuation), '', k) for k in sentence]
    return out
    



def bulk_aug(final_df_num):
    
    final_aug_data = []
    
    for data_keys in tqdm(final_df_num):
    
        aug_data   = []
        random_key = data_keys['key']
        sentences  = data_keys['data']

        for chunked_sentence in tqdm(sentences):
            ug_d = transform_data(chunked_sentence, 2)
            aug_data.append(ug_d)
        
        print(aug_data)
        final_aug_data.append({'key': random_key, 
                           'data': [" ".join(k) for k in list(zip(*aug_data))]})
    return final_aug_data

数据如下:

data = [{'key': 12, 'data': [['this is a ?sentence1', 'this is a sentence1'], ['this is a sentence2', 'this is a sentence2'], 
                             ['this is a sentence3', 'this is a sentence3'], ['this is a sentence4', 'this is a sentence4']]},
        
        {'key': 190, 'data': [['this is a sentence11', 'this is a sentence11'], ['this is a sentence22', 'this is a sentence22'], 
                             ['this is a sentence33', 'this is a sentence33'], ['this is a sentence44', 'this is a sentence44']]}, 
        
        {'key': 1900, 'data': [['this is a sentence55', 'this is a sentence55'], ['this is a sentence66', 'this is a sentence66'], 
                              ['this is a sentence77', 'this is a sentence77'], ['this is a sentence88', 'this is a sentence88']]}]

输出如下所示:

# bulk_aug(data)


[{'key': 12,
  'data': ['this is a sentence1 this is a sentence2 this is a sentence3 this is a sentence4',
   'this is a sentence1 this is a sentence2 this is a sentence3 this is a sentence4']},
 {'key': 190,
  'data': ['this is a sentence11 this is a sentence22 this is a sentence33 this is a sentence44',
   'this is a sentence11 this is a sentence22 this is a sentence33 this is a sentence44']},
 {'key': 1900,
  'data': ['this is a sentence55 this is a sentence66 this is a sentence77 this is a sentence88',
   'this is a sentence55 this is a sentence66 this is a sentence77 this is a sentence88']}]

我想将此函数转换为多处理函数。我尝试了什么:

def bulk_aug(final_df_num):
    
    final_aug_data = []
    aug_data       = []
        
    data_keys  = final_df_num
    random_key = data_keys['key']
    sentences  = data_keys['data']

    for chunked_sentence in tqdm(sentences):
        ug_d = transform_data(chunked_sentence, 2)
        aug_data.append(ug_d)
        
        print(aug_data)
        
    final_aug_data.append({'key': random_key, 
                       'data': [" ".join(k) for k in list(zip(*aug_data))]})
    return final_aug_data

from multiprocessing import Pool
import time


with Pool(2) as p:
    r = list(tqdm(p.imap(bulk_aug, data), total=len(data)))

但是处理数据需要花费太多时间。代码有问题吗?

【问题讨论】:

  • 你确定这是由于计算时间而不是imap中IPC的开销?
  • 您确定发布了您正在运行的实际代码吗?您的串行代码不完整,并且您的多处理代码会生成 TypeError: string indices must be integers 错误(与您的串行代码一样)。

标签: python python-3.x multithreading parallel-processing multiprocessing


【解决方案1】:

如果没有大量数据,我无法测试代码的速度,因为创建新进程的开销很大。

原始代码背后的想法是正确的,但大量时间用于将数据(酸洗/解酸洗)传输到新进程或从新进程传出。

这是一个以块的形式传递数据的版本,因此与进程通信的开销要少一些。

def simple_aug(data_keys):
    aug_data   = []
    random_key = data_keys['key']
    sentences  = data_keys['data']

    for chunked_sentence in sentences:
        ug_d = transform_data(chunked_sentence, 2)
        aug_data.append(ug_d)
    
    # print(aug_data)
    return {'key': random_key, 
                       'data': [" ".join(k) for k in list(zip(*aug_data))]}

def bulk_aug_mp(final_df_num, nproc=2):
    with Pool(nproc) as p:
        rv = p.map(simple_aug, final_df_num, chunksize=(len(final_df_num)//nproc)+1)
    return list(rv)

在 Linux 中,如果 data 是一个全局变量并且您在处理过程中不更改它,您可以做得更好:

def bulk_aug_chunks(idx_init, idx_end):
    global data
    final_aug_data = []
    for data_keys in data[idx_init:idx_end]:
        aug_data   = []
        random_key = data_keys['key']
        sentences  = data_keys['data']

        for chunked_sentence in sentences:
            ug_d = transform_data(chunked_sentence, 2)
            aug_data.append(ug_d)
        
        # print(aug_data)
        final_aug_data.append({'key': random_key, 
                           'data': [" ".join(k) for k in list(zip(*aug_data))]})
    return final_aug_data

def bulk_aug_mp2(final_df_num, nproc=2):
    chunk_size = len(final_df_num)//nproc
    ends = [chunk_size * (_+1)  for _ in range(nproc)]
    ends[-1] += len(final_df_num) % nproc
    starts = [chunk_size * _  for _ in range(nproc)]
    with Pool(nproc) as p:
        rv = p.starmap(bulk_aug_chunks, zip(starts, ends))
    return [r for sublist in rv for r in sublist]

【讨论】:

    猜你喜欢
    • 2016-02-14
    • 1970-01-01
    • 1970-01-01
    • 2020-09-25
    • 2016-01-01
    • 1970-01-01
    • 2018-06-15
    • 2018-09-21
    • 2014-10-12
    相关资源
    最近更新 更多