【问题标题】:how to use initializer to set up my multiprocess pool?如何使用初始化程序来设置我的多进程池?
【发布时间】:2012-04-24 09:58:00
【问题描述】:

我正在尝试使用多进程池对象。我希望每个进程在启动时打开一个数据库连接,然后使用该连接来处理传入的数据。(而不是为每一位数据打开和关闭连接。)这似乎是初始化程序对于,但我无法理解工作人员和初始化程序是如何通信的。所以我有这样的事情:

def get_cursor():
  return psycopg2.connect(...).cursor()

def process_data(data):
   # here I'd like to have the cursor so that I can do things with the data

if __name__ == "__main__":
  pool = Pool(initializer=get_cursor, initargs=())
  pool.map(process_data, get_some_data_iterator())

我如何(或我如何)将光标从 get_cursor() 取回 process_data()?

【问题讨论】:

    标签: python multiprocessing


    【解决方案1】:

    初始化函数是这样调用的:

    def worker(...):
        ...
        if initializer is not None:
            initializer(*args)
    

    所以在任何地方都没有保存返回值。你可能认为这注定了你的命运,但不是!每个工人都在一个单独的进程中。因此,您可以使用普通的global 变量。

    这不是很漂亮,但它确实有效:

    cursor = None
    def set_global_cursor(...):
        global cursor
        cursor = ...
    

    现在您可以在您的process_data 函数中使用cursor。每个单独进程中的cursor 变量与所有其他进程是分开的,因此它们不会相互踩踏。

    (我不知道psycopg2 是否有不同的方法来处理这个问题,首先不涉及使用multiprocessing;这是对multiprocessing 模块的一般问题的一般回答.)

    【讨论】:

    • @torek set_global_cursor 应该在 init_worker 中调用吗?
    • @TheUnfunCat:不知道init_worker 是什么(我在您的答案中看到了一个,但在原始问题中没有)我真的不能肯定地说。一般的想法是允许multiprocess.Pool 创建一个进程池,并让每个进程创建(它自己的私有副本)数据库连接。如果您希望在启动池进程时发生这种情况,请使用初始化函数。如果你想让它稍后发生,你可以稍后再做。无论哪种方式,您都需要一个持久变量,例如您的方法中的function.cursor,或者一个普通的global
    • 无论如何,我发现我和你的解决方案都很可怕而且有点神奇(我相信 pylint 也会抱怨)。我想知道是否有更pythonic的方式......
    • @Tarjintor:跨越文件边界不应该有问题,因为关键是这些是独立的进程(好像两个不同的人运行了两个不同的python <file> 命令) ,所以命名空间照常工作。我发现命名每个进程很有帮助:第一个(您运行的)是 Alice,第二个(Alice 启动的)是 Bob,依此类推。然后你可以说“Alice 的变量 X 设置为 3,Bob 的 X 设置为 42...”
    • 有效!这真的很棒,因为来自像 SWIG 之类的库的对象不能被腌制,这使得它可以工作,因为不需要腌制。它可以在我的 6 核 i5 上以 6 倍的速度运行 SentencePiece 之类的东西。谢谢!
    【解决方案2】:

    您还可以将函数发送到初始化程序并在其中创建连接。然后将光标添加到函数中。

    def init_worker(function):
        function.cursor = db.conn()
    

    现在你可以通过 function.cursor 访问 db 而无需使用全局变量,例如:

    def use_db(i):
        print(use_db.cursor) #process local
    pool = Pool(initializer=init_worker, initargs=(use_db,))
    pool.map(use_db, range(10))
    

    【讨论】:

    • 您的进程命令是否类似于:p = Pool(initializer=init_worker, args=(func)); p.map(func, args_set); ??
    • 是的,类似的东西(我记得这个工作,但有一段时间没有从事相关工作,所以不记得确切的细节,请随时 dv 或修改我的答案,)
    • 我喜欢这个答案,因为它不会为每个调用传递初始化参数。如果初始化器参数很大,那么我不希望它们在每次调用时都被腌制。
    • 这与在调用 Pool 之前附加光标有什么不同吗?是否因为.map() 只对函数进行一次腌制而起作用?
    • 我不明白这个答案。 SQL 逻辑会在哪里执行?
    【解决方案3】:

    torek 已经很好地解释了为什么初始化程序在这种情况下不起作用。但是,我个人不是全局变量的粉丝,所以我想在这里粘贴另一个解决方案。

    想法是使用一个类来包装函数并用“全局”变量初始化类。

    class Processor(object):
      """Process the data and save it to database."""
    
      def __init__(self, credentials):
        """Initialize the class with 'global' variables"""
        self.cursor = psycopg2.connect(credentials).cursor()
    
      def __call__(self, data):
        """Do something with the cursor and data"""
        self.cursor.find(data.key)
    

    然后调用

    p = Pool(5)
    p.map(Processor(credentials), list_of_data)
    

    所以第一个参数用凭证初始化类,返回一个类的实例,用数据映射调用实例。

    虽然这不像全局变量解决方案那样简单,但我强烈建议避免使用全局变量并以某种安全的方式封装变量。 (我真的希望他们有一天能支持 lambda 表达式,这会让事情变得更容易......)

    【讨论】:

    • 我喜欢这个答案,因为它很漂亮,但它不会为列表中的每个项目重新连接吗?
    • 避免全局变量 通常很好,您可以这样做,但您需要推迟初始化 self.cursor 直到 p.map 实际启动流程实例。也就是说,您的__init__ 只会将此设置为None,而__call__ 会说if self.cursor is None: self.cursor = ...。最后,我们真正需要的是每个进程的单例。
    • 这不会导致为每个任务重新运行初始化程序(池中的每个进程可能不止一次)?
    • 如果初始化比较耗时,这个答案基本是把初始化序列化了,是错误的答案。另外,有些时候初始化不能在一个进程中进行两次。
    • 此解决方案无法获得与使用全局变量相同的结果。每次map(...) 将任务从list_of_data 交给Processor.__call__() 时,整个Processor 对象都会被腌制,并作为第一个参数传递给__call__(self, data) b/c,它是一个实例方法。即使psycopg2.connection.Cursor() 对象是可腌制的,您也无法初始化任何变量,您只需腌制该对象,然后从子进程内__call__() 中的self 实例访问它。此外,如果Processor 上的任何对象很大,则此解决方案将慢到爬行。
    【解决方案4】:

    鉴于在初始化程序中定义全局变量通常是不可取的,我们可以避免使用它们,也可以避免在每个调用中重复昂贵的初始化,并在每个子进程中进行简单的缓存:

    from functools import lru_cache
    from multiprocessing.pool import Pool
    from time import sleep
    
    
    @lru_cache(maxsize=None)
    def _initializer(a, b):
        print(f'Initialized with {a}, {b}')
    
    
    def _pool_func(a, b, i):
        _initializer(a, b)
        sleep(1)
        print(f'got {i}')
    
    
    arg_a = 1
    arg_b = 2
    
    with Pool(processes=5) as pool:
        pool.starmap(_pool_func, ((arg_a, arg_b, i) for i in range(0, 20)))
    

    输出:

    Initialized with 1, 2
    Initialized with 1, 2
    Initialized with 1, 2
    Initialized with 1, 2
    Initialized with 1, 2
    got 1
    got 0
    got 4
    got 2
    got 3
    got 5
    got 7
    got 8
    got 6
    got 9
    got 10
    got 11
    got 12
    got 14
    got 13
    got 15
    got 16
    got 17
    got 18
    got 19
    

    【讨论】:

    • 这只会为您节省在初始化程序中扩展的计算。相反,如果您的初始化程序主要包括在主进程和工作进程之间传输大量数据,那么与上述解决方案相反,它对您没有帮助。
    【解决方案5】:

    如果你的第一个答案不清楚,这里是运行的 sn-p:

    import multiprocessing
    n_proc = 5
    cursor = [ 0 for _ in range(n_proc)]
    def set_global_cursor():
        global cursor
        cursor[multiprocessing.current_process()._identity[0]-1] = 1
    
    def process_data(data):
        print(cursor)
        return data**2
        
    pool = multiprocessing.Pool(processes=n_proc,initializer=set_global_cursor)
    pool.map(process_data, list(range(10))) 
    

    输出:

    [1, 0, 0, 0, 0]
    [0, 0, 1, 0, 0]
    [0, 1, 0, 0, 0]
    [0, 0, 1, 0, 0]
    [0, 0, 0, 0, 1]
    [1, 0, 0, 0, 0]
    [0, 0, 1, 0, 0]
    [0, 0, 1, 0, 0]
    [0, 0, 0, 1, 0]
    [0, 1, 0, 0, 0]
    

    【讨论】:

      猜你喜欢
      • 2020-12-13
      • 1970-01-01
      • 2014-10-12
      • 2021-10-07
      • 2022-07-06
      • 2012-01-27
      • 2011-08-14
      • 2011-08-27
      • 1970-01-01
      相关资源
      最近更新 更多