【问题标题】:multiprocessing always returns an empty file when doing apply_async执行 apply_async 时,多处理总是返回一个空文件
【发布时间】:2021-07-14 06:03:02
【问题描述】:

我有一个包含 50 万行的文件 (input.txt),我想用我的 encrypt 函数加密这些行,并将它们保存到一个名为 output.txt 的文件中。例如input.txt

aab
abb
abc

那我想让我的output.txt成为

001
011
012

简单的for循环版本

我有一个有效的for 循环,但是加密所有行需要将近 9 个小时:

encryption_map = {}
encryption_map['a']=0
encryption_map['b']=1
encryption_map['c']=2

def encrypt(input_str):
    output_int = ''
    for i in input_str: 
        for ch in i.split('\n')[0]: # remove line break symbol \n 
            output_int += str(encryption_map[ch])
    return output_int

text_path = 'input.txt'
with open(text_path, 'r') as input_file:
    lines = input_file.readlines()
    with open('output.txt', 'w') as output_file:
        for l in lines:
            output_int = encrypt(l)
            output_file.write(output_int + '\n')    

apply_async版本

由于我想保持相同的顺序,在output.txt,看来我必须使用apply_async。那么我的代码就变成了:

import multiprocessing as mp

encryption_map = {}
encryption_map['a']=0
encryption_map['b']=1
encryption_map['c']=2

def encrypt(input_str):
    output_int = ''
    for i in input_str: 
        for ch in i.split('\n')[0]: # remove line break symbol \n 
            output_int += str(encryption_map[ch])
    return output_int

def write_result(output):
    output_file.write(ipa_output + '\n')
    # output_file.flush() # This line is suggested by another stack question

pool = mp.Pool(20)

text_path = 'input.txt'
with open(text_path, 'r') as input_file:
    lines = input_file.readlines()
    with open('output.txt', 'w') as output_file:
        for l in lines:
            pool.apply_async(encrypt, args=l, callback=write_result)
pool.close()
pool.join()

它运行得更快,但是 output.txt 始终为空。我的代码有什么问题?我找到了一个post,写出文件也有困难,他们建议我们把f.flush()放在write函数里面,但是也不管用。

【问题讨论】:

    标签: python python-3.x multithreading multiprocessing


    【解决方案1】:

    你需要这样写args=(line,)

    import multiprocessing as mp
    
    
    encryption_map = {}
    encryption_map['a'] = 0
    encryption_map['b'] = 1
    encryption_map['c'] = 2
    
    
    output_file = open('output.txt', 'w')
    
    
    def encrypt(input_str):
        output_int = ''
        for i in input_str:
            for ch in i.split('\n')[0]:
                output_int += str(encryption_map[ch])
        return output_int
    
    
    def write_result(output):
        output_file.write(output + '\n')
    
    
    def main():
        #mp.set_start_method('spawn')  # Only needed on OSX
        pool = mp.Pool(2)
        with open('input.txt') as input_file:
            lines = input_file.readlines()
            for line in lines:
                pool.apply_async(encrypt, args=(line,), callback=write_result)
        pool.close()
        pool.join()
        output_file.close()
    
    
    if __name__ == '__main__':
        main()
    

    编辑

    在上面的代码中,由于我们使用apply_async,所以输出might not be the same as that of the input中的行顺序。
    如果我们想保持秩序,那么我们可以使用map/map_async/imap
    在这种情况下,imap might be the best option 因为回调操作(IO 限制)比 worker 操作(CPU 限制)慢得多:

    import multiprocessing as mp
    
    
    encryption_map = {}
    encryption_map['a'] = 0
    encryption_map['b'] = 1
    encryption_map['c'] = 2
    
    
    output_file = open('output.txt', 'w')
    
    
    def encrypt(input_str):
        output_int = ''
        for i in input_str:
            for ch in i.split('\n')[0]:
                output_int += str(encryption_map[ch])
        return output_int
    
    
    def main():
        mp.set_start_method('spawn')  # Only needed on OSX
        pool = mp.Pool(2)
        with open('input.txt') as input_file:
            lines = input_file.readlines()
            for output in pool.imap(encrypt, lines):
                output_file.write(output + '\n')
        pool.close()
        pool.join()
    
    
    if __name__ == '__main__':
        main()
    

    【讨论】:

    • 它可以工作,但是输出文件的顺序搞砸了......其他帖子说我应该在r=pool.apply_async之后添加r.wait()。但是添加r.wait() 使其与没有多处理的普通for 循环一样慢......
    • 编辑了答案
    猜你喜欢
    • 1970-01-01
    • 2014-09-06
    • 2010-10-18
    • 2019-12-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多