【问题标题】:Understanding memory behavior of Dask distributed了解 Dask 分布式的内存行为
【发布时间】:2017-06-06 23:47:32
【问题描述】:

类似于this question,我遇到了分布式Dask 的内存问题。然而,在我的例子中,解释并不是客户端试图收集大量数据。

这个问题可以基于一个非常简单的任务图来说明:delayed 操作列表会生成一些固定大小约为 500 MB 的随机数据帧(以模拟从文件加载许多分区)。任务图中的下一个操作是获取每个 DataFrame 的大小。最后,所有尺寸都缩小为一个总尺寸,即必须返回给客户端的数据很小。

出于测试目的,我正在运行一个本地调度程序/工作程序单线程,限制为 2GB 内存,即:

$ dask-scheduler
$ dask-worker localhost:8786 --nthreads 1 --memory-limit 2000000000

我对任务图的期望是,worker 永远不需要超过 500 MB 的 RAM,因为在 “生成数据”之后直接运行 “获取数据大小” > 应该立即使数据变小。但是,我观察到工作人员需要更多的内存:

因子 2 表示必须在内部复制数据。因此,任何使分区大小接近节点物理内存的尝试都会导致MemoryErrors 或大量交换。

非常感谢任何能对此有所了解的信息。特别是:

  • 我是否可以控制数据重复,是否可以避免?还是一般的经验法则是让有效负载远低于 50% 以解决数据重复问题?
  • 工作人员memory-limit 如何影响此行为?从我的测试来看,使用较低的阈值似乎更早触发 GC(和/或溢出到磁盘?),但另一方面,还有其他内存峰值甚至超过了使用较高阈值的峰值内存。

请注意,我知道我可以通过在第一次操作中获取大小 within 来解决此特定问题,并且可能 Dask 的单机执行器更适合该问题,但我在问用于教育目的。


附件一:测试代码

from __future__ import division, print_function
import pandas as pd
import numpy as np
from dask import delayed
from dask.distributed import Client, Executor


def simulate_df_partition_load(part_id):
    """
    Creates a random DataFrame of ~500 MB
    """
    num_rows = 5000000
    num_cols = 13

    df = pd.DataFrame()
    for i in xrange(num_cols):
        data_col = np.random.uniform(0, 1, num_rows)
        df["col_{}".format(i)] = data_col
        del data_col    # for max GC-friendliness

    print("[Partition {}] #rows: {}, #cols: {}, memory: {} MB".format(
        part_id, df.shape[0], df.shape[1],
        df.memory_usage().sum() / (2 ** 20)
    ))
    return df


e = Executor('127.0.0.1:8786', set_as_default=True)

num_partitions = 2

lazy_dataframes = [
    delayed(simulate_df_partition_load)(part_id)
    for part_id in xrange(num_partitions)
]

length_partitions = [df.shape[0] for df in lazy_dataframes]
dag = delayed(sum)(length_partitions)

length_total = dag.compute()

附件 2: DAG 插图

【问题讨论】:

  • 你是如何得到显示内存的图表的?加上图表是每个工人还是所有工人?
  • @AmyChodorowski 这只是来自 Dask 监控仪表板,我认为它仅指一名工人。请注意,这个问题是几年前的问题,也许 Dask 监控仪表板已经发生了一点变化。

标签: python dask dask-delayed


【解决方案1】:

这里有几个问题:

  1. 为什么我看到的内存使用量是单个数据元素的两倍?
  2. 建议的行为是将分区大小保持在远低于总内存的水平吗?
  3. 当我超出 --memory-limit 值时会发生什么

为什么我看到的内存使用量是原来的两倍?

worker 在执行第一个计算大小任务之前可能正在运行两个创建数据任务。这是因为调度程序将所有当前可运行的任务分配给工作人员,可能超过他们一次可以运行的任务。工作人员完成第一个并向调度程序报告。当调度程序决定向工作人员发送什么新任务(计算大小的任务)时,工作人员会立即启动另一个创建数据任务。

建议的做法是保持分区大小远低于总内存吗?

是的。

当我超出 --memory-limit 值时会发生什么?

worker 将开始将最近最少使用的数据元素写入磁盘。默认情况下,当您的内存使用率约为 60% 时(由__sizeof__ 协议测量),它会执行此操作。

注意:感谢您提出的好问题

【讨论】:

  • 非常感谢,这很清楚。我向生成器 + 映射器添加了一些日志记录,我可以确认这正是我得到的行为:前两个任务是生成器任务,从那里开始,当使用两个以上分区时,映射器和生成器任务是交替的。
猜你喜欢
  • 2016-11-27
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-12-13
  • 2019-07-08
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多