【问题标题】:Python multiprocessing Pool map and imapPython 多处理池映射和 imap
【发布时间】:2017-04-09 06:29:47
【问题描述】:

我有一个 multiprocessing 脚本和 pool.map 可以工作。问题是并非所有进程都需要很长时间才能完成,因此有些进程会因为等待所有进程完成而进入休眠状态(与this question 中的问题相同)。有些文件不到一秒就完成了,有些则需要几分钟(或几小时)。

如果我正确理解手册(and this post)pool.imap 不会等待所有进程完成,如果完成,它会提供一个新文件来处理。当我尝试这样做时,脚本正在加速要处理的文件,按预期处理小文件,大文件(需要更多时间来处理)直到最后才完成(被杀死而不通知?)。这是pool.imap 的正常行为,还是我需要添加更多命令/参数?当我在else 部分中添加time.sleep(100) 作为测试时,它正在处理更多的大文件,但其他进程进入休眠状态。有什么建议 ?谢谢

def process_file(infile):
    #read infile
    #compare things in infile
    #acquire Lock, save things in outfile, release Lock
    #delete infile

def main():
    #nprocesses = 8
    global filename
    pathlist = ['tmp0', 'tmp1', 'tmp2', 'tmp3', 'tmp4', 'tmp5', 'tmp6', 'tmp7', 'tmp8', 'tmp9']
    for d in pathlist:
        os.chdir(d)      
        todolist = []
        for infile in os.listdir():  
            todolist.append(infile)
        try:   
            p = Pool(processes=nprocesses)
            p.imap(process_file, todolist)
        except KeyboardInterrupt:                
            print("Shutting processes down")
           # Optionally try to gracefully shut down the worker processes here.       
            p.close()
            p.terminate()
            p.join()
        except StopIteration:
            continue     
        else:
            time.sleep(100)
            os.chdir('..')
        p.close()
        p.join() 

if __name__ == '__main__':
    main()    

【问题讨论】:

  • 我一直在思考imap的问题。 Map 正在等待所有进程完成返回结果。 Imap 将在第一个进程完成后立即返回结果,并且可能会终止其他进程并给所有新工作。这可能是正确的吗?

标签: multiprocessing cpu-usage python-3.5 pool


【解决方案1】:

由于您已经将所有文件放入列表中,您可以将它们直接放入队列中。然后与您的子进程共享队列,这些子进程从队列中获取文件名并执行它们的工作。不需要做两次(首先进入列表,然后是 Pool.imap 的泡菜列表)。 Pool.imap 做的完全一样,但你不知道。

todolist = []
for infile in os.listdir():  
    todolist.append(infile)

可以替换为:

todolist = Queue()
for infile in os.listdir():  
    todolist.put(infile)

完整的解决方案如下所示:

def process_file(inqueue):
    for infile in iter(inqueue.get, "STOP"):
        #do stuff until inqueue.get returns "STOP"
    #read infile
    #compare things in infile
    #acquire Lock, save things in outfile, release Lock
    #delete infile

def main():
    nprocesses = 8
    global filename
    pathlist = ['tmp0', 'tmp1', 'tmp2', 'tmp3', 'tmp4', 'tmp5', 'tmp6', 'tmp7', 'tmp8', 'tmp9']
    for d in pathlist:
        os.chdir(d)      
        todolist = Queue()
        for infile in os.listdir():  
            todolist.put(infile)
        process = [Process(target=process_file,
                      args=(todolist) for x in range(nprocesses)]
        for p in process:
            #task the processes to stop when all files are handled
            #"STOP" is at the very end of queue
            todolist.put("STOP")
        for p in process:
            p.start()
        for p in process:
            p.join()    
if __name__ == '__main__':
    main()

【讨论】:

  • 非常感谢拉贾!现在它正在按我的意愿工作。为了完整性:args=(todolist) for x in range(nprocesses)] 必须是 args=(todolist,)) for x in range(nprocesses)]。前几天晚上我一直在尝试使用 Queue,但到目前为止出现了很多错误。现在我很清楚它是如何工作的!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2013-05-17
  • 2015-11-17
  • 2015-06-08
  • 2017-04-18
  • 2018-01-28
  • 2023-01-30
  • 1970-01-01
相关资源
最近更新 更多