【问题标题】:Finding Length of Dask dataframe查找 Dask 数据帧的长度
【发布时间】:2021-01-03 16:12:14
【问题描述】:

我正在尝试使用 len(dataframe[column]) 查找 dask 数据帧的长度,但每次我尝试执行此操作时都会出错:

distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
Traceback (most recent call last):
  File "C:\Users\thakneh\AppData\Local\Continuum\anaconda3\lib\multiprocessing\queues.py", line 238, in _feed
    send_bytes(obj)
  File "C:\Users\thakneh\AppData\Local\Continuum\anaconda3\lib\multiprocessing\connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "C:\Users\thakneh\AppData\Local\Continuum\anaconda3\lib\multiprocessing\connection.py", line 280, in _send_bytes
    ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
BrokenPipeError: [WinError 232] The pipe is being closed
distributed.nanny - ERROR - Nanny failed to start process
Traceback (most recent call last):
  File "C:\Users\thakneh\AppData\Local\Continuum\anaconda3\lib\site-packages\distributed\nanny.py", line 575, in start
    await self.process.start()
  File "C:\Users\thakneh\AppData\Local\Continuum\anaconda3\lib\site-packages\distributed\process.py", line 34, in _call_and_set_future
    res = func(*args, **kwargs)
  File "C:\Users\thakneh\AppData\Local\Continuum\anaconda3\lib\site-packages\distributed\process.py", line 202, in _start
    process.start()
  File "C:\Users\thakneh\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 112, in start
    self._popen = self._Popen(self)
  File "C:\Users\thakneh\AppData\Local\Continuum\anaconda3\lib\multiprocessing\context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\Users\thakneh\AppData\Local\Continuum\anaconda3\lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)
  File "C:\Users\thakneh\AppData\Local\Continuum\anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 89, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\Users\thakneh\AppData\Local\Continuum\anaconda3\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
  File "C:\Users\thakneh\AppData\Local\Continuum\anaconda3\lib\multiprocessing\connection.py", line 948, in reduce_pipe_connection
    dh = reduction.DupHandle(conn.fileno(), access)
  File "C:\Users\thakneh\AppData\Local\Continuum\anaconda3\lib\multiprocessing\connection.py", line 170, in fileno
    self._check_closed()
  File "C:\Users\thakneh\AppData\Local\Continuum\anaconda3\lib\multiprocessing\connection.py", line 136, in _check_closed
    raise OSError("handle is closed")
OSError: handle is closed
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting

我的 dask 数据框有 1000 万行。有什么办法可以解决这个错误。

【问题讨论】:

  • 请提供一个最小的、可重现的例子。考虑到集群的内存限制,您可能需要为数据帧使用更多分区。

标签: python dask dask-dataframe


【解决方案1】:

我觉得找到列的长度不会那么简单,因为 Dask 可能正在从各种来源构建数据框 - 类似于为什么您可以在数据框上获得 .head(),但需要做一些额外的事情做.tail()

由于您使用的是这么大的数据框,我相信 Python 会将 len() 中的任何内容加载到内存中。

我有两个建议,但我不完全确定它们不会触发相同的异常。

使用pipe

让我们看看这是否可行,您可以尝试在您的专栏中使用pipe 并将len 传递给它,也许这可以工作。

dataframe["column"].pipe(len)

参考pipe documentation

分区

我认为它可能会有所帮助的一件事是,如果您将列分区为块,这可能有助于保持较低的内存使用率,唯一的问题是您必须对这些分区的大小做一些访客工作会的。

您必须跟踪的另一件事是每个分区的长度,这可能有点混乱,我觉得必须有更好的方法来做到这一点。

length = 0

len += dataframe["column"].partitions[:10000]
len += dataframe["column"].partitions[:20000]

当然,您可以尝试使用循环来使代码更简洁。

这里是dataframe.partitions的文档供参考

请让我知道这些是否有效,我希望我能帮助你。

【讨论】:

  • df.partitions[:20000] 可能没有做你认为它正在做的事情。这将返回 dask 数据帧的前 20,000 个分区。这不是长度或行数,也不是整数。
  • 谢谢你的尼克,我不确定这是否是一种方法,我认为管道或 dask.delayed 可能是获得长度的唯一方法列?
  • “我相信 Python 会将 len() 中的任何内容加载到内存中”——不,这通常不是真的
猜你喜欢
  • 2021-09-07
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多