【问题标题】:Launch concurrent.futures.ProcessPoolExecutor with initialization?启动 concurrent.futures.ProcessPoolExecutor 并初始化?
【发布时间】:2015-11-14 20:20:57
【问题描述】:

我打算使用concurrent.futures.ProcessPoolExecutor 来并行执行函数。根据documentation,它的executor对象只能接受map中的一个简单函数。我的实际情况涉及执行“待并行化”函数之前的初始化(加载数据)。我该如何安排?

“待并行化”函数在一次迭代中被多次调用。我不希望它每次都重新初始化。

换句话说,有一个init 函数会为这个 tbp 函数产生一些输出。每个孩子都应该有自己的输出副本,因为函数依赖于此。

【问题讨论】:

  • 你能创建一个例子来说明初始化的实际情况吗?您是否有理由不能只围绕要并行化的函数创建一个包装器来进行初始化,然后调用该函数?
  • 好吧,将初始化视为接近json.load,“待并行化”函数在迭代中被多次调用。我不希望它每次都重新初始化。
  • 啊,好的。并且初始化需要在子进程中运行,而不是父进程?
  • FWIW,有一个patch waiting for review,我认为这正是你想要的。
  • 感谢您的信息。

标签: python python-3.x concurrency multiprocessing subprocess


【解决方案1】:

听起来您正在寻找与multiprocessing.Pool 所采用的initializer/initargs 选项等效的选项。目前,concurrent.futures.ProcessPoolExecutor 不存在该行为,但有一个 patch waiting for review 添加了该行为。

因此,您可以使用 multiprocessing.Pool(这可能适合您的用例),等待该补丁被合并和发布(您可能要等待一段时间 :)),或者推出您自己的解决方案。事实证明,为 map 编写一个采用 initializer 的包装函数并不难,但每个进程只调用一个:

from concurrent.futures import ProcessPoolExecutor
from functools import partial

inited = False
initresult = None

def initwrapper(initfunc, initargs, f, x):
    # This will be called in the child. inited
    # Will be False the first time its called, but then
    # remain True every other time its called in a given
    # worker process.
    global inited, initresult
    if not inited:
        inited = True
        initresult = initfunc(*initargs)
    return f(x)

def do_init(a,b):
    print('ran init {} {}'.format(a,b))
    return os.getpid() # Just to demonstrate it will be unique per process

def f(x):
    print("Hey there {}".format(x))
    print('initresult is {}'.format(initresult))
    return x+1

def initmap(executor, initializer, initargs, f, it):
    return executor.map(partial(initwrapper, initializer, initargs, f), it)


if __name__ == "__main__":
    with ProcessPoolExecutor(4) as executor:
        out = initmap(executor, do_init, (5,6), f, range(10))
    print(list(out))

输出:

ran init 5 6
Hey there 0
initresult is 4568
ran init 5 6
Hey there 1
initresult is 4569
ran init 5 6
Hey there 2
initresult is 4570
Hey there 3
initresult is 4569
Hey there 4
initresult is 4568
ran init 5 6
Hey there 5
initresult is 4571
Hey there 6
initresult is 4570
Hey there 7
initresult is 4569
Hey there 8
initresult is 4568
Hey there 9
initresult is 4570
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

【讨论】:

  • 这可能是我的问题的措辞问题。但就我而言,函数f(x) 需要访问来自initfunc 的返回结果。每个孩子都应该有自己的副本。我在这个实现中看不到它的可能。
  • @HeShiming 我已经编辑了我的答案,以证明您可以与每个子函数共享initfunc 的结果,这种方式很容易转换为multiprocessing.Pool w/initializer夸格。
  • 但是嗯...initresult 是全球性的。就我而言,每个孩子都必须有一份单独的副本。而且我不确定它是否可以腌制,这不是真的......
  • @HeShiming 它是全局的每个子进程。看看我的回答中的输出,initresult 打印了四个不同的值,一个用于池中的每个进程。 initwrapper 只在子进程中运行,所以不需要picklable。
  • @HeShiming 是的,乍一看有点混乱。诀窍是initwrapper,因此do_init,只在子进程内部执行(意味着在对executor.map的调用内部)。那时,所有全局变量对每个孩子都是唯一的。 ProcessPoolExecutor 对象使每个子进程在Executor 的整个生命周期内保持活动状态(基本上只是使用while <is_pool_alive>: 循环),因此一旦在子进程中初始化全局,它将被重新用于每次致电f
【解决方案2】:

截至Python 3.7ThreadPoolExecutorProcessPoolExecutor 都有可选的initializerinitargs 参数。每个线程/进程启动后都会调用initializer(*initargs)

https://docs.python.org/3.7/library/concurrent.futures.html

【讨论】:

  • 如果文档显示initializer 的签名,那就太好了。它可以接收唯一的任务 ID 吗?
  • 是否可以将数据库连接作为initializer 的一部分进行初始化?插入数据时将其保留在进程中是有意义的。
猜你喜欢
  • 2019-07-27
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多