【问题标题】:Hung cells: running multiple jupyter notebooks in parallel with papermillHung cells:与 papermill 并行运行多个 jupyter notebook
【发布时间】:2021-04-10 18:43:46
【问题描述】:

我正在尝试通过从另一个笔记本启动它们来并行运行 jupyter 笔记本。我正在使用papermill 来保存笔记本的输出。

在我的 scheduler.ipynb 中,我使用的是 multiprocessing,这正是 some people 所取得的成功。我从基本笔记本创建进程,这似乎总是在第一次运行时工作。我可以在 13 秒内运行 3 个带有 sleep 10 的笔记本。如果我有一个后续单元尝试运行完全相同的东西,它产生的进程(多个笔记本)会无限期地挂起。我尝试添加代码以确保生成的进程具有退出代码并已完成,甚至在完成后对它们调用终止 - 不走运,我的第二次尝试永远不会完成。

如果我这样做:

sean@server:~$ ps aux | grep ipython 
root      2129  0.1  0.2 1117652 176904 ?      Ssl  19:39   0:05 /opt/conda/anaconda/bin/python /opt/conda/anaconda/bin/ipython kernel -f /root/.local/share/jupyter/runtime/kernel-eee374ff-0760-4490-8ed0-db03fed84f0c.json
root      3418  0.1  0.2 1042076 173652 ?      Ssl  19:42   0:03 /opt/conda/anaconda/bin/python /opt/conda/anaconda/bin/ipython kernel -f /root/.local/share/jupyter/runtime/kernel-3e2f09e8-969f-41c9-81cc-acd2ec4e3d54.json
root      4332  0.1  0.2 1042796 174896 ?      Ssl  19:44   0:04 /opt/conda/anaconda/bin/python /opt/conda/anaconda/bin/ipython kernel -f /root/.local/share/jupyter/runtime/kernel-bbd4575c-109a-4fb3-b6ed-372beb27effd.json
root     17183  0.2  0.2 995344 145872 ?       Ssl  20:26   0:02 /opt/conda/anaconda/bin/python /opt/conda/anaconda/bin/ipython kernel -f /root/.local/share/jupyter/runtime/kernel-27c48eb1-16b4-4442-9574-058283e48536.json

我看到似乎有 4 个正在运行的内核(4 个进程)。当我查看正在运行的笔记本时,我看到有 6 个正在运行的笔记本。 a few kernels can service multiple notebooks 的文档似乎支持这一点。 “一个内核进程可以同时连接到多个前端”

但是,我怀疑是因为 ipython 内核继续运行,所以在没有收获派生进程的地方发生了一些不好的事情? Some say it’s not possible 使用多处理。 Others have described the same problem.

import re
import os
import multiprocessing

from os.path import isfile
from datetime import datetime

import papermill as pm
import nbformat

# avoid "RuntimeError: This event loop is already running"
# it seems papermill used to support this but it is now undocumented: 
#  papermill.execute_notebook(nest_asyncio=True)
import nest_asyncio
nest_asyncio.apply()

import company.config


# # Supporting Functions

# In[ ]:


def get_papermill_parameters(notebook,
                             notebook_prefix='/mnt/jupyter',
                             notebook_suffix='.ipynb'):
  if isinstance(notebook, list):
    notebook_path = notebook[0]
    parameters = notebook[1]
    tag = '_' + notebook[2] if notebook[2] is not None else None
  else:
    notebook_path = notebook
    parameters = None
    tag = ''
    
  basename = os.path.basename(notebook_path)
  dirpath = re.sub(basename + '$', '', notebook_path)
  this_notebook_suffix = notebook_suffix if not re.search(notebook_suffix + '$', basename) else ''
  
  input_notebook = notebook_prefix + notebook_path + this_notebook_suffix
  scheduler_notebook_dir = notebook_prefix + dirpath + 'scheduler/'
  
  if not os.path.exists(scheduler_notebook_dir):
    os.makedirs(scheduler_notebook_dir)
    
  output_notebook = scheduler_notebook_dir + basename 
  
  return input_notebook, output_notebook, this_notebook_suffix, parameters, tag


# In[ ]:


def add_additional_imports(input_notebook, output_notebook, current_datetime):          
  notebook_name = os.path.basename(output_notebook) 
  notebook_dir = re.sub(notebook_name, '', output_notebook)
  temp_dir = notebook_dir + current_datetime + '/temp/'
  results_dir = notebook_dir + current_datetime + '/'
  
  if not os.path.exists(temp_dir):
    os.makedirs(temp_dir)
  if not os.path.exists(results_dir):
    os.makedirs(results_dir) 
    
  updated_notebook = temp_dir + notebook_name 
  first_cell = nbformat.v4.new_code_cell("""
    import import_ipynb
    import sys
    sys.path.append('/mnt/jupyter/lib')""")
        
  metadata = {"kernelspec": {"display_name": "PySpark", "language": "python", "name": "pyspark"}}
  existing_nb = nbformat.read(input_notebook, nbformat.current_nbformat)
  cells = existing_nb.cells
  cells.insert(0, first_cell)
  new_nb = nbformat.v4.new_notebook(cells = cells, metadata = metadata)
  nbformat.write(new_nb, updated_notebook, nbformat.current_nbformat)
  output_notebook = results_dir + notebook_name
  
  return updated_notebook, output_notebook


# In[ ]:


# define this function so it is easily passed to multiprocessing
def run_papermill(input_notebook, output_notebook, parameters):
  pm.execute_notebook(input_notebook, output_notebook, parameters, log_output=True)


# # Run All of the Notebooks

# In[ ]:


def run(notebooks, run_hour_utc=10, scheduler=True, additional_imports=False,
        parallel=False, notebook_prefix='/mnt/jupyter'):
  """
  Run provided list of notebooks on a schedule or on demand.

  Args:
    notebooks (list): a list of notebooks to run
    run_hour_utc (int): hour to run notebooks at
    scheduler (boolean): when set to True (default value) notebooks will run at run_hour_utc.
                         when set to False notebooks will run on demand.
    additional_imports (boolean): set to True if you need to add additional imports into your notebook
    parallel (boolean): whether to run the notebooks in parallel
    notebook_prefix (str): path to jupyter notebooks
  """
  if not scheduler or datetime.now().hour == run_hour_utc:  # Only run once a day on an hourly cron job.
    now = datetime.today().strftime('%Y-%m-%d_%H%M%S')
    procs = []
    notebooks_base_url = company.config.cluster['resources']['daedalus']['notebook'] + '/notebooks'
    
    if parallel and len(notebooks) > 10:
      raise Exception("You are trying to run {len(notebooks)}. We recommend a maximum of 10 be run at once.")

    for notebook in notebooks:
      input_notebook, output_notebook, this_notebook_suffix, parameters, tag = get_papermill_parameters(notebook, notebook_prefix)
      
      if is_interactive_notebook(input_notebook):
        print(f"Not running Notebook '{input_notebook}' because it's marked interactive-only.")
        continue
      
      if additional_imports:
        input_notebook, output_notebook = add_additional_imports(input_notebook, output_notebook, now)
      else:
        output_notebook = output_notebook + tag + '_' + now + this_notebook_suffix
      
      print(f"Running Notebook: '{input_notebook}'")
      print(" - Parameters: " + str(parameters))
      print(f"Saving Results to: '{output_notebook}'")
      print("Link: " + re.sub(notebook_prefix, notebooks_base_url, output_notebook))
    
      if not os.path.isfile(input_notebook):
        print(f"ERROR! Notebook file does not exist: '{input_notebook}'")
      else:
        try:
          if parameters is not None:
            parameters.update({'input_notebook':input_notebook, 'output_notebook':output_notebook})
          if parallel:
            # trailing comma in args is in documentation for multiprocessing- it seems to matter
            proc = multiprocessing.Process(target=run_papermill, args=(input_notebook, output_notebook, parameters,))
            print("starting process")
            proc.start()
            procs.append(proc)
            
          else:
            run_papermill(input_notebook, output_notebook, parameters)
            
        except Exception as ex:
          print(ex)
          print(f"ERROR! See full error in: '{output_notebook}'\n\n")
          
      if additional_imports:
        temp_dir = re.sub(os.path.basename(input_notebook), '', input_notebook)
        if os.path.exists(temp_dir):
          os.system(f"rm -rf '{temp_dir}'")
    
    if procs:
      print("joining")
      for proc in procs:
        proc.join()
    
    if procs:
      print("terminating")
      for proc in procs:
        print(proc.is_alive())
        print(proc.exitcode)
        proc.terminate()
    
    print(f"Done: Processed all {len(notebooks)} notebooks.")
    
  else:
    print(f"Waiting until {run_hour_utc}:00:00 UTC to run.")

我正在使用 python==3.6.12,papermill==2.2.2

jupyter core     : 4.7.0
jupyter-notebook : 5.5.0
ipython          : 7.16.1
ipykernel        : 5.3.4
jupyter client   : 6.1.7
ipywidgets       : 7.2.1

【问题讨论】:

  • 可能不相关,但您是否查看过ipyparallel?链接~>ipyparallel.readthedocs.io/en/latest
  • 是否有必要从另一个笔记本运行这些笔记本?我认为最简单的方法是编写一个 bash 脚本来并行执行笔记本(或不按要求执行)。您还可以使用 cron 作业来安排脚本。

标签: python multiprocessing jupyter papermill


【解决方案1】:

您是否尝试过使用subprocess 模块?对您来说,这似乎是一个更好的选择,而不是多处理。它允许您异步生成将并行运行的子进程,这可用于调用命令和程序,就像您使用 shell 一样。我发现编写 python 脚本而不是 bash 脚本真的很有用。

因此,您可以使用主笔记本将其他笔记本作为独立的子进程与subprocesses.run(your_function_with_papermill) 并行运行。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-10-23
    • 1970-01-01
    • 2018-01-08
    • 1970-01-01
    • 1970-01-01
    • 2019-06-26
    • 2022-07-20
    • 2017-08-30
    相关资源
    最近更新 更多