【问题标题】:dask computation not executing in paralleldask 计算未并行执行
【发布时间】:2016-02-19 22:31:27
【问题描述】:

我有一个 json 文件目录,我正在尝试将其转换为 dask DataFrame 并将其保存到 castra。 它们之间有 200 个包含 O(10**7) json 记录的文件。 代码非常简单,主要遵循教程示例。

import dask.dataframe as dd
import dask.bag as db
import json
txt = db.from_filenames('part-*.json')
js = txt.map(json.loads)
df = js.to_dataframe()
cs=df.to_castra("data.castra")

我在 32 核机器上运行它,但代码仅 100% 使用一个核。 我对文档的理解是这段代码是并行执行的。 为什么不是? 我是不是误会了什么?

【问题讨论】:

    标签: python concurrency python-multiprocessing dask castra


    【解决方案1】:

    您的最终集合是一个 dask 数据帧,默认情况下使用线程,您必须明确告诉 dask 使用进程。

    您可以全局执行此操作

    import dask
    dask.config.set(scheduler='multiprocessing')
    

    或者只在to_castra 电话上执行此操作

    df.to_castra("data.castra", scheduler='multiprocessing')
    

    另外,作为一个警告,Castra 主要是一个实验。它速度相当快,但也不像 HDF5 或 Parquet 那样成熟。

    【讨论】:

    • 谢谢,但它对我不起作用,df.to_castra 不采用get 并且dask.set_options 方法似乎没有任何效果。我知道 Castra 是实验性的,但它似乎非常适合我的许多用例。我经常有中等大小的数据,我希望能够将它们用作具有快速保存和加载时间的 DataFrame。 Spark 或 HDFS 是 ovekill 和 Pandas 并没有安静地延伸那么远。
    • 啊,是的,你确实是对的。看起来我们故意将单核调度程序硬编码到 to_castra 中。我认为这是因为我们遇到了糟糕的内存性能,因为中间结果会随着数据准备好但磁盘跟不上而堆积起来。不过,这应该是可选的。我已经在 github.com/dask/dask/commit/…@ 的 master 中修复了一个快速修复
    • 谢谢!我试过了。它确实使代码并行运行,但最终崩溃。还有一些其他问题。我把细节写在 CL 的注释里。
    • dask 较新版本的语法现在是:import dask.multiprocessing dask.config.set(scheduler=dask.multiprocessing.get)
    猜你喜欢
    • 2021-05-13
    • 1970-01-01
    • 2018-01-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-07-19
    • 2021-11-05
    • 2018-11-28
    相关资源
    最近更新 更多