【问题标题】:Tracking progress of joblib.Parallel execution跟踪joblib.Parallel执行的进度
【发布时间】:2014-09-18 22:43:42
【问题描述】:

是否有一种简单的方法来跟踪 joblib.Parallel 执行的整体进度?

我有一个由数千个作业组成的长时间运行的执行,我想在数据库中对其进行跟踪和记录。但是,要做到这一点,每当 Parallel 完成任务时,我需要它执行回调,报告剩余的作业数。

我之前使用 Python 的 stdlib multiprocessing.Pool 完成了类似的任务,方法是启动一个线程来记录 Pool 的作业列表中待处理作业的数量。

看代码,Parallel继承了Pool,所以我想我可以实现同样的技巧,但它似乎没有使用这些列表,我一直无法弄清楚如何“阅读” " 任何其他方式都是内部状态。

【问题讨论】:

    标签: python multithreading parallel-processing multiprocessing joblib


    【解决方案1】:

    比 dano 和 Connor 的答案更进一步的是将整个事情包装为上下文管理器:

    import contextlib
    import joblib
    from tqdm import tqdm
    
    @contextlib.contextmanager
    def tqdm_joblib(tqdm_object):
        """Context manager to patch joblib to report into tqdm progress bar given as argument"""
        class TqdmBatchCompletionCallback(joblib.parallel.BatchCompletionCallBack):
            def __call__(self, *args, **kwargs):
                tqdm_object.update(n=self.batch_size)
                return super().__call__(*args, **kwargs)
    
        old_batch_callback = joblib.parallel.BatchCompletionCallBack
        joblib.parallel.BatchCompletionCallBack = TqdmBatchCompletionCallback
        try:
            yield tqdm_object
        finally:
            joblib.parallel.BatchCompletionCallBack = old_batch_callback
            tqdm_object.close()
    

    然后你可以像这样使用它,完成后不要留下猴子修补的代码:

    from joblib import Parallel, delayed
    
    with tqdm_joblib(tqdm(desc="My calculation", total=10)) as progress_bar:
        Parallel(n_jobs=16)(delayed(sqrt)(i**2) for i in range(10))
    

    我认为这太棒了,它看起来类似于 tqdm pandas 集成。

    【讨论】:

    • 这应该是最好的答案!谢谢
    • 优秀的解决方案。使用 joblib 0.14.1 和 tq​​dm 4.41.0 测试——效果很好。这将是 tqdm 的一个很好的补充!
    • 我无法编辑它,但解决方案中有轻微的错字,其中 joblib.parallel.BatchCompletionCallback 实际上是 BatchCompletionCallBack(注意 CallBack 上的驼峰式)
    【解决方案2】:

    为什么不能简单地使用tqdm?以下对我有用

    from joblib import Parallel, delayed
    from datetime import datetime
    from tqdm import tqdm
    
    def myfun(x):
        return x**2
    
    results = Parallel(n_jobs=8)(delayed(myfun)(i) for i in tqdm(range(1000))
    100%|██████████| 1000/1000 [00:00<00:00, 10563.37it/s]
    

    【讨论】:

    • 非常整洁。谢谢。
    • 我不认为这实际上是监视正在运行的作业的完成,只是作业的排队。如果您要在myfun 的开头插入time.sleep(1),您会发现tqdm 进度几乎立即完成,但results 需要几秒钟才能填充。
    • 是的,部分正确。它正在跟踪作业开始与完成情况,但另一个问题是在所有作业完成后还会出现由开销引起的延迟。完成所有任务后,需要收集结果,这可能需要很长时间。
    • 我相信这个答案并不能真正回答这个问题。如前所述,使用这种方法将跟踪 queuing 而不是 execution 本身。下面显示的回调方法似乎更准确地解决了这个问题。
    • 这个答案不正确,因为它没有回答问题。这个答案应该是不被接受的。
    【解决方案3】:

    您链接到的文档指出Parallel 有一个可选的进度表。它是通过使用multiprocessing.Pool.apply_async提供的callback关键字参数实现的:

    # This is inside a dispatch function
    self._lock.acquire()
    job = self._pool.apply_async(SafeFunction(func), args,
                kwargs, callback=CallBack(self.n_dispatched, self))
    self._jobs.append(job)
    self.n_dispatched += 1
    

    ...

    class CallBack(object):
        """ Callback used by parallel: it is used for progress reporting, and
            to add data to be processed
        """
        def __init__(self, index, parallel):
            self.parallel = parallel
            self.index = index
    
        def __call__(self, out):
            self.parallel.print_progress(self.index)
            if self.parallel._original_iterable:
                self.parallel.dispatch_next()
    

    这里是print_progress

    def print_progress(self, index):
        elapsed_time = time.time() - self._start_time
    
        # This is heuristic code to print only 'verbose' times a messages
        # The challenge is that we may not know the queue length
        if self._original_iterable:
            if _verbosity_filter(index, self.verbose):
                return
            self._print('Done %3i jobs       | elapsed: %s',
                        (index + 1,
                         short_format_time(elapsed_time),
                        ))
        else:
            # We are finished dispatching
            queue_length = self.n_dispatched
            # We always display the first loop
            if not index == 0:
                # Display depending on the number of remaining items
                # A message as soon as we finish dispatching, cursor is 0
                cursor = (queue_length - index + 1
                          - self._pre_dispatch_amount)
                frequency = (queue_length // self.verbose) + 1
                is_last_item = (index + 1 == queue_length)
                if (is_last_item or cursor % frequency):
                    return
            remaining_time = (elapsed_time / (index + 1) *
                        (self.n_dispatched - index - 1.))
            self._print('Done %3i out of %3i | elapsed: %s remaining: %s',
                        (index + 1,
                         queue_length,
                         short_format_time(elapsed_time),
                         short_format_time(remaining_time),
                        ))
    

    老实说,他们实现这一点的方式有点奇怪 - 它似乎假设任务将始终按照它们开始的顺序完成。转到print_progressindex 变量只是作业实际开始时的self.n_dispatched 变量。因此,启动的第一个作业将始终以 0 的 index 结束,即使说第三个作业首先完成。这也意味着他们实际上并没有跟踪已完成 工作的数量。所以没有实例变量供您监控。

    我认为最好的办法是制作自己的 CallBack 类和猴子补丁 Parallel:

    from math import sqrt
    from collections import defaultdict
    from joblib import Parallel, delayed
    
    class CallBack(object):
        completed = defaultdict(int)
    
        def __init__(self, index, parallel):
            self.index = index
            self.parallel = parallel
    
        def __call__(self, index):
            CallBack.completed[self.parallel] += 1
            print("done with {}".format(CallBack.completed[self.parallel]))
            if self.parallel._original_iterable:
                self.parallel.dispatch_next()
    
    import joblib.parallel
    joblib.parallel.CallBack = CallBack
    
    if __name__ == "__main__":
        print(Parallel(n_jobs=2)(delayed(sqrt)(i**2) for i in range(10)))
    

    输出:

    done with 1
    done with 2
    done with 3
    done with 4
    done with 5
    done with 6
    done with 7
    done with 8
    done with 9
    done with 10
    [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
    

    这样,每当作业完成时都会调用您的回调,而不是默认的。

    【讨论】:

    • 伟大的研究,谢谢。我没有注意到回调属性。
    • 我发现joblib的文档非常有限。我必须深入研究这个 CallBack 类的源代码。我的问题:我可以在调用 __call__ 时自定义参数吗? (对整个 Parallel 类进行子分类可能是一种方法,但对我来说很重)。
    【解决方案4】:

    扩展 dano 对最新版本的 joblib 库的回答。内部实现有一些变化。

    from joblib import Parallel, delayed
    from collections import defaultdict
    
    # patch joblib progress callback
    class BatchCompletionCallBack(object):
      completed = defaultdict(int)
    
      def __init__(self, time, index, parallel):
        self.index = index
        self.parallel = parallel
    
      def __call__(self, index):
        BatchCompletionCallBack.completed[self.parallel] += 1
        print("done with {}".format(BatchCompletionCallBack.completed[self.parallel]))
        if self.parallel._original_iterator is not None:
          self.parallel.dispatch_next()
    
    import joblib.parallel
    joblib.parallel.BatchCompletionCallBack = BatchCompletionCallBack
    

    【讨论】:

      【解决方案5】:

      TLDR 解决方案

      使用 python 3.5 与 joblib 0.14.0 和 tq​​dm 4.46.0 一起使用。感谢 frenzykryger 提供 contextlib 建议,感谢 dano 和 Connor 提供猴子修补想法。

      import contextlib
      import joblib
      from tqdm import tqdm
      from joblib import Parallel, delayed
      
      @contextlib.contextmanager
      def tqdm_joblib(tqdm_object):
          """Context manager to patch joblib to report into tqdm progress bar given as argument"""
      
          def tqdm_print_progress(self):
              if self.n_completed_tasks > tqdm_object.n:
                  n_completed = self.n_completed_tasks - tqdm_object.n
                  tqdm_object.update(n=n_completed)
      
          original_print_progress = joblib.parallel.Parallel.print_progress
          joblib.parallel.Parallel.print_progress = tqdm_print_progress
      
          try:
              yield tqdm_object
          finally:
              joblib.parallel.Parallel.print_progress = original_print_progress
              tqdm_object.close()
      

      你可以像 frenzykryger 描述的那样使用它

      import time
      def some_method(wait_time):
          time.sleep(wait_time)
      
      with tqdm_joblib(tqdm(desc="My method", total=10)) as progress_bar:
          Parallel(n_jobs=2)(delayed(some_method)(0.2) for i in range(10))
      

      更长的解释:

      Jon 的解决方案实现起来很简单,但它只测量已调度的任务。如果任务耗时较长,则在等待最后一个分派的任务执行完毕时,进度条会一直卡在 100%。

      从 dano 和 Connor 改进而来的 frenzykryger 的上下文管理器方法更好,但在任务完成之前也可以使用 ImmediateResult 调用 BatchCompletionCallBack(参见 Intermediate results from joblib)。这将使我们的计数超过 100%。

      我们可以只修补Parallel 中的print_progress 函数,而不是猴子修补BatchCompletionCallBack。无论如何,BatchCompletionCallBack 已经调用了这个print_progress。如果设置了详细信息(即Parallel(n_jobs=2, verbose=100)),print_progress 将打印出已完成的任务,尽管不如 tqdm 好。查看代码,print_progress 是一个类方法,所以它已经有 self.n_completed_tasks 记录我们想要的数字。我们所要做的只是将其与 joblib 的当前进度状态进行比较,并仅在有差异时更新。

      这是使用 python 3.5 在 joblib 0.14.0 和 tq​​dm 4.46.0 中测试的。

      【讨论】:

        【解决方案6】:

        文字进度条

        对于那些想要文本进度条而不需要像 tqdm 这样的附加模块的人来说,还有一个变体。 Joblib=0.11 的实际值,Linux 上的 python 3.5.2 于 16.04.2018 并在子任务完成时显示进度。

        重新定义原生类:

        class BatchCompletionCallBack(object):
            # Added code - start
            global total_n_jobs
            # Added code - end
            def __init__(self, dispatch_timestamp, batch_size, parallel):
                self.dispatch_timestamp = dispatch_timestamp
                self.batch_size = batch_size
                self.parallel = parallel
        
            def __call__(self, out):
                self.parallel.n_completed_tasks += self.batch_size
                this_batch_duration = time.time() - self.dispatch_timestamp
        
                self.parallel._backend.batch_completed(self.batch_size,
                                                   this_batch_duration)
                self.parallel.print_progress()
                # Added code - start
                progress = self.parallel.n_completed_tasks / total_n_jobs
                print(
                    "\rProgress: [{0:50s}] {1:.1f}%".format('#' * int(progress * 50), progress*100)
                    , end="", flush=True)
                if self.parallel.n_completed_tasks == total_n_jobs:
                    print('\n')
                # Added code - end
                if self.parallel._original_iterator is not None:
                    self.parallel.dispatch_next()
        
        import joblib.parallel
        import time
        joblib.parallel.BatchCompletionCallBack = BatchCompletionCallBack
        

        使用作业总数定义全局常量:

        total_n_jobs = 10
        

        这将导致如下结果:

        Progress: [########################################          ] 80.0%
        

        【讨论】:

        • 效果很好。如果您也想打印时间估计,您可以使用以下内容调整__call__:``` time_remaining = (this_batch_duration / self.batch_size) * (total_n_jobs - self.parallel.n_completed_tasks) print("\rProgress: [{0 :50s}] {1:.1f}% est {2:1f}mins left".format('#' * int(progress * 50), progress*100, time_remaining/60) , end="", flush=真)```
        【解决方案7】:

        这是您问题的另一个答案,语法如下:

        aprun = ParallelExecutor(n_jobs=5)
        
        a1 = aprun(total=25)(delayed(func)(i ** 2 + j) for i in range(5) for j in range(5))
        a2 = aprun(total=16)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
        a2 = aprun(bar='txt')(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
        a2 = aprun(bar=None)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
        

        https://stackoverflow.com/a/40415477/232371

        【讨论】:

          【解决方案8】:

          在 Jupyter 中,tqdm 每次输出时都会在输出中开始一个新行。 所以对于 Jupyter Notebook,它将是:

          用于 Jupyter 笔记本。 不睡觉:

          from joblib import Parallel, delayed
          from datetime import datetime
          from tqdm import notebook
          
          def myfun(x):
              return x**2
          
          results = Parallel(n_jobs=8)(delayed(myfun)(i) for i in notebook.tqdm(range(1000)))  
          

          100% 1000/1000 [00:06

          随着时间.sleep:

          from joblib import Parallel, delayed
          from datetime import datetime
          from tqdm import notebook
          from random import randint
          import time
          
          def myfun(x):
              time.sleep(randint(1, 5))
              return x**2
          
          results = Parallel(n_jobs=7)(delayed(myfun)(i) for i in notebook.tqdm(range(100)))
          

          我目前使用的是什么而不是 joblib.Parallel:

          import concurrent.futures
          from tqdm import notebook
          from random import randint
          import time
          
          iterable = [i for i in range(50)]
          
          def myfun(x):
              time.sleep(randint(1, 5))
              return x**2
          
          def run(func, iterable, max_workers=8):
              with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
                  results = list(notebook.tqdm(executor.map(func, iterable), total=len(iterable)))
              return results
          
          run(myfun, iterable)
          

          【讨论】:

          • 错了,这只会计算作业开始时间,无论您包装什么功能,它都会立即生效。
          • 官方文档怎么会出错呢? joblib.readthedocs.io/en/latest Ctrl+F for "Parallel(n_jobs=1)" 我的回答是关于在 Jupyter notebook 中运行 tqdm。它与接受的几乎相同。唯一的区别是它适用于 Jupyter notebook。
          • 我想我明白了。看起来你是对的。
          • 但是,它在 Jupyter notebook 中不是即时的。例如,14% 14/100 [00:05
          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2020-05-17
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2019-03-08
          相关资源
          最近更新 更多