【问题标题】:Dask graph execution and memory usageDask 图执行和内存使用
【发布时间】:2017-06-06 23:38:12
【问题描述】:

我正在构建一个非常大的 DAG 以提交给分布式调度程序,其中节点对本身可能非常大的数据帧进行操作。一种模式是我有大约 50-60 个函数来加载数据并构建每个数百 MB 的 pandas 数据帧(并在逻辑上表示单个表的分区)。我想将这些连接到图中下游节点的单个 dask 数据帧中,同时最大限度地减少数据移动。我像这样链接任务:

dfs = [dask.delayed(load_pandas)(i) for i in disjoint_set_of_dfs]
dfs = [dask.delayed(pandas_to_dask)(df) for df in dfs]
return dask.delayed(concat_all)(dfs)

在哪里

def pandas_to_dask(df):
    return dask.dataframe.from_pandas(df).to_delayed()

我已经尝试了各种concat_all 实现,但这似乎是合理的:

def concat_all(dfs):
    dfs = [dask.dataframe.from_delayed(df) for df in dfs]
    return dask.dataframe.multi.concat(dfs, axis='index', join='inner')

所有 pandas 数据帧的索引都是不相交的,并且是有序的/单调的。

但是,尽管每个人的内存预算实际上相当大而且我不希望它是四处移动数据。我有理由确定,在使用 dask 数据框的图形节点中调用 compute() 之前,我总是切分到合理的数据子集。

到目前为止,我正在玩--memory-limit,但没有成功。我至少正确地解决了这个问题吗?有没有我遗漏的注意事项?

【问题讨论】:

    标签: python dask dask-delayed


    【解决方案1】:

    鉴于您计算到熊猫数据帧的延迟值列表

    >>> dfs = [dask.delayed(load_pandas)(i) for i in disjoint_set_of_dfs]
    >>> type(dfs[0].compute())  # just checking that this is true
    pandas.DataFrame
    

    将它们传递给dask.dataframe.from_delayed 函数

    >>> ddf = dd.from_delayed(dfs)
    

    默认情况下,这将运行第一个计算以确定元数据(对于 dask.dataframe 很重要的列名、dtypes 等)。您可以通过构建示例数据框并将其传递给 meta= 关键字来避免这种情况。

    >>> meta = pd.DataFrame({'value': [1.0], 'name': ['foo'], 'id': [0]})
    >>> ddf = dd.from_delayed(dfs, meta=meta)
    

    这个example notebook 也可能有帮助。

    通常,您永远不需要从其他 dask 函数中调用 dask 函数(就像您通过延迟 from_pandas 调用所做的那样)。 Dask.dataframe 函数本身已经是惰性的,不需要进一步延迟。

    【讨论】:

    • 感谢您的快速回复。我观察到 dd.from_delayed(dfs) 立即评估 dfs[0] 以提取元数据。出于某种原因,这给我带来了问题。是否有另一种方法可以将这种评估推迟到完全构建图表?我会尝试制作一个复制品。
    • 您可以为meta= 关键字提供示例数据框。我将在答案中添加一个示例。
    猜你喜欢
    • 2021-06-08
    • 1970-01-01
    • 1970-01-01
    • 2020-05-08
    • 2017-06-08
    • 1970-01-01
    • 1970-01-01
    • 2022-09-27
    • 1970-01-01
    相关资源
    最近更新 更多