【问题标题】:Wait for pool.apply_async inside a loop在循环中等待 pool.apply_async
【发布时间】:2020-04-04 03:18:33
【问题描述】:

我第一次尝试在我的 python 代码中实现多处理。我被卡住了,因为我无法让 async_apply 等待其所有进程完成。我想在浏览长长的元素列表时以较小的块处理元素并保存结果。

举个更简单的例子:

import multiprocessing as mp

def fun(x, y):
    print("here")
    return(x+y)

buffer = []

for val in range(10):
    buffer.append(val)
    print(f{Added value: {val})
    if len(buffer) == 5:
        #It is my understanding, this is necessary on Windows
        if __name__ == "__main__":
            pool = mp.Pool()
            res = [pool.apply_async(fun, args = (x,x)) for x in buffer]
            res = [r.wait() for r in res]
            print(f'Results: {res}')
            buffer = []
            pool.close()
            pool.join()

我希望它能够产生以下输出:

Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Here
Here
Here
Here
Here
Results: [0, 2, 4, 6, 8]
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Here
Here
Here
Here
Here
Results: [10, 12, 14, 16, 18]

但是它实际上会产生这个(至少在我的机器上):

Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Here
Here
Here
Here
Here
Results: [None, None, None, None, None]
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Here
Here
Here
Here
Here
Results: [None, None, None, None, None]

非常感谢任何建议。

【问题讨论】:

  • 如果我使用 map 情况会变得更好(它实际上会输出结果),但循环仍然在后台运行并提供大量额外输出——就像在 OP 中一样。

标签: python multiprocessing python-multiprocessing


【解决方案1】:

尝试将整个 for 循环放入条件套件中。

...
if __name__ == '__main__':

    for val in range(10):
        buffer.append(val)
        print(f'Added value: {val}')
        if len(buffer) == 5:
            pool = mp.Pool()
            res = [pool.apply_async(fun, args = (x,x)) for x in buffer]
            # wait til they are ALL done ?
            for r in res:
                r.wait()
            # get the return values
            res = [r.get() for r in res]
            print(f'Results: {res}')
            buffer = []
            pool.close()
            pool.join()

这是您的原件,经过一些额外检查。我仍然不知道为什么,但似乎 for 循环中的行以某种方式在多个 python 进程中运行。

import multiprocessing as mp
import pickle

def fun(x, y, pid=None):
    print(f"here pid:{pid}",file=sys.stderr)
    return (x+y,pid)

buffer = []
stuff = []

with open(r'c:\pyProjects\stuff.pkl','wb') as f:
    pickle.dump(stuff,f)

for val in range(10):
    buffer.append(val)
    pid = os.getpid()
    print(f'Added value: {val}.   pid={pid}')
    d = {'val':val,'pid':pid}
    with open(r'c:\pyProjects\stuff.pkl','rb') as f:
        try:
            stuff = pickle.load(f)
            stuff.append(d)
        except EOFError as e:
            s = '\n'.join(f'\t\t\t\t{item}' for item in stuff)
            print(f'\t\t\tEOFError {d}\n\t\t\tstuff:\n{s}\n')
    with open(r'c:\pyProjects\stuff.pkl','wb') as f:
        pickle.dump(stuff,f)
    if len(buffer) == 5:
        print(buffer)
        #It is my understanding, this is necessary on Windows
        if __name__ == "__main__":
            pool = mp.Pool()
            res = [pool.apply_async(fun, args = (x,x,pid)) for x in buffer]
            res = [r.get() for r in res]
            print(f'\t\t\tResults: {res}')
            buffer = []
            pool.close()
            pool.join()

完成后,您可以使用

加载和阅读腌制文件
>>> import pickle
>>> from pprint import pprint
>>> with open(r'c:\pyProjects\stuff.pkl','rb') as f:
...     a = pickle.load(f)

>>> a.sort(key=lambda x: x['pid'])
>>> pprint(a)

【讨论】:

  • 成功了!谢谢你。您能否提供或指出正在发生的事情的解释?我真的很困惑......但是,再次感谢你。
  • @non87 - 我不知道为什么你的原件会这样 - 只打印了两个结果并且值是正确的(如果你更改为 res = [r.get() for r in res]) - 有点像额外的东西是打印echoes。我想一点内省可能会帮助解决这个问题。坦率地说,您的代码 structure 看起来很有趣,所以我只是按照我认为应该的方式进行了尝试,并且成功了。
  • 谢谢!一旦我启动并运行了代码,我实际上会查看 pickle 文件。正如我所提到的,这是我第一次使用 mp 库,所以我不太了解发生了什么——而且我的实现确实很有趣
猜你喜欢
  • 2023-01-20
  • 2015-07-03
  • 1970-01-01
  • 1970-01-01
  • 2017-08-17
  • 1970-01-01
  • 2018-06-09
  • 2018-02-12
相关资源
最近更新 更多