【问题标题】:Calling scheduler.multiprocessing.get in a separate process in dask在 dask 的单独进程中调用 scheduler.multiprocessing.get
【发布时间】:2018-12-05 14:01:15
【问题描述】:

我正在训练一个具有大型文本语料库的神经网络。每个文本都会生成一个很大的矩阵,因为我使用的是卷积模型。由于我的数据不会在我仍然很大的内存中,我尝试流式传输它,并使用keras.models fit_generator

为了喂 keras,我有一个由不同预处理步骤组成的管道,我用一个带有很多分区的 dask bag 来安排它。 dask bag 读取磁盘上的文件。

即使是 dask 也没有以聪明的方式处理迭代(它只是计算()和迭代结果,在我的情况下会炸毁内存),我打算使用这样的东西:

def compute_partition_iter(collection, **kwargs):
    """A utility to compute a collection items after items
    """
    get = kwargs.pop("get", None) or _globals['get']
    if get is None:
        get = collection.__dask_scheduler__
    postcompute_func, postcompute_args = collection.__dask_postcompute__()
    dsk = collection.__dask_graph__()
    for key in collection.__dask_keys__():
        yield from f([partition], *args)

这会逐个计算分区并返回项目,当我们跨越分区边界时计算下一个分区。

这种方法有一个问题:只有当我们从分区中击中最后一个项目时,我们才会激发下一个元素的计算,从而导致到下一个元素的延迟。在这种滞后中,keras 停滞不前,我们失去了宝贵的时间!

所以我想在一个单独的进程中运行上面的compute_partition_iter,这要归功于multiprocessing.Pool,在Queue 中提供分区,比如 2 个插槽,这样在生成器中,我不会总是准备好一个分区.

但是dask.bag似乎不支持这个。我没有深入研究代码,但似乎使用了一些异步方法,或者我不知道是什么。

这是解决问题的可重现代码。

首先是一个有效的代码,使用一个简单的范围。

import multiprocessing
import time


def put_q(n, q):
    for i in range(n):
        print(i, "<-")
        q.put(i)
    q.put(None)

q = multiprocessing.Queue(2)
with multiprocessing.Pool(1, put_q, (4, q)) as pool:
    i = True
    while i is not None:
        print("zzz")
        time.sleep(.5)
        i = q.get()
        if i is None:
            break
        print("-> ", i)

这个输出

0 <-
1 <-
2 <-
zzz
3 <-
->  0
zzz
->  1
zzz
->  2
zzz
->  3
zzz

您可以看到,正如预期的那样,元素在预期中计算并且一切正常。

现在让我们用dask.bag替换范围:

import multiprocessing
import time

import dask.bag


def put_q(n, q):
    for i in dask.bag.from_sequence(range(n), npartitions=2):
        print(i, "<-")
        q.put(i)
    q.put(None)

q = multiprocessing.Queue(5)
with multiprocessing.Pool(1, put_q, (4, q)) as pool:
    i = True
    while i is not None:
        print("zzz")
        time.sleep(.5)
        i = q.get()
        if i is None:
            break
        print("-> ", i)

在一个 jupyter notebook 中,它会无限期地提升:

Process ForkPoolWorker-71:
Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.5/multiprocessing/pool.py", line 103, in worker
    initializer(*initargs)
  File "<ipython-input-3-e1e9ef9354a0>", line 8, in put_q
    for i in dask.bag.from_sequence(range(n), npartitions=2):
  File "/usr/local/lib/python3.5/dist-packages/dask/bag/core.py", line 1190, in __iter__
    return iter(self.compute())
  File "/usr/local/lib/python3.5/dist-packages/dask/base.py", line 154, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/dask/base.py", line 407, in compute
    results = get(dsk, keys, **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/dask/multiprocessing.py", line 152, in get
    initializer=initialize_worker_process)
  File "/usr/lib/python3.5/multiprocessing/context.py", line 118, in Pool
    context=self.get_context())
  File "/usr/lib/python3.5/multiprocessing/pool.py", line 168, in __init__
    self._repopulate_pool()
  File "/usr/lib/python3.5/multiprocessing/pool.py", line 233, in _repopulate_pool
    w.start()
  File "/usr/lib/python3.5/multiprocessing/process.py", line 103, in start
    'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children

当主进程停止时,等待队列中的元素。

我也尝试使用 ipyparallel 集群,但在这种情况下,主进程只是停止了(没有异常痕迹)。

有谁知道这样做的正确方法吗?

有没有办法可以让 scheduler.get 与我的主代码并行运行?

【问题讨论】:

    标签: python-multiprocessing dask


    【解决方案1】:

    最后我应该仔细看看异常!

    Stackoverflow 给了我解决方案:Python Process Pool non-daemonic?

    事实上,由于包调度程序使用 Pool,它不能在由 pool 产生的进程内调用。在我的情况下,解决方案是简单地使用线程。 (注意该错误及其解决方案取决于您使用的调度程序)。

    所以我用 multiprocessing.Pool 代替了 multiprocessing.pool.ThreadPool,它的工作原理就像一个魅力,无论是在普通笔记本中,还是在 using ipyparallel 时。

    原来是这样的:

    import queue
    from multiprocessing.pool import ThreadPool
    import time
    
    import dask.bag
    
    
    def put_q(n, q):
        b = dask.bag.from_sequence(range(n), npartitions=3)
        for i in b:
            print(i, "<-")
            q.put(i)
        q.put(None)
    
    q = queue.Queue(2)
    with ThreadPool(1, put_q, (6, q)) as pool:
        i = True
        while i is not None:
            print("zzz")
            time.sleep(.5)
            i = q.get()
            if i is None:
                break
            print("-> ", i)
    

    哪些输出:

    zzz
    0 <-
    1 <-
    2 <-
    ->  0
    zzz
    3 <-
    ->  1
    zzz
    4 <-
    -> 5 <-
     2
    zzz
    ->  3
    zzz
    ->  4
    zzz
    ->  5
    zzz
    

    【讨论】:

      猜你喜欢
      • 2017-06-02
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-08-13
      • 2015-05-20
      • 2021-06-24
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多