【问题标题】:dask bag not using all cores? alternatives?大包不使用所有核心?备择方案?
【发布时间】:2016-03-08 13:13:53
【问题描述】:

我有一个 python 脚本,它执行以下操作: 一世。它接受数据的输入文件(通常是嵌套的 JSON 格式) ii.将数据逐行传递给另一个函数,该函数将数据处理为所需格式 iii.最后将输出写入文件。

这是我当前执行此操作的简单 python 行...

def manipulate(line):
    # a pure python function which transforms the data
    # ...
    return manipulated_json

for line in f:
    components.append(manipulate(ujson.loads(line)))
    write_to_csv(components)`

这是可行的,但是由于 python GIL 将其限制为服务器上的一个核心,因此速度非常慢,尤其是在处理大量数据的情况下。

我通常处理的数据量大约是 4 gigs gzip 压缩,但有时我必须处理数百 gigs gzip 压缩的数据。它不一定是大数据,但仍然无法全部在内存中处理,并且使用 Python 的 GIL 处理速度非常慢。

在寻找优化数据处理的解决方案时,我遇到了 dask。虽然 PySpark 在当时对我来说似乎是显而易见的解决方案,但 dask 的承诺和它的简单性赢得了我的青睐,我决定试一试。

在对 dask 以及如何使用它进行了大量研究之后,我编写了一个非常小的脚本来复制我当前的流程。脚本如下所示:

import dask.bag as bag
import json
bag.from_filenames('input.json.gz').map(json.loads).map(lambda x:manipulate(x)).concat().to_dataframe().to_csv('output.csv.gz')`

这可以工作并产生与我原来的非 dask 脚本相同的结果,但它仍然只使用服务器上的一个 CPU。所以,它根本没有帮助。事实上,它更慢。

我做错了什么?我错过了什么吗?我对 dask 还是很陌生,所以让我知道我是否忽略了某些东西,或者我是否应该做一些完全不同的事情。

另外,是否有任何替代 dask 来使用服务器的全部容量(即所有 CPU)来完成我需要做的事情?

谢谢,

T

【问题讨论】:

  • 嗯从未听说过dask,真的很有趣,谢谢。你看过盒子标准的multiprocessing吗?这很简单(主义),但很有效。
  • 您可能想在Blaze mailing list 上提问。 Dask 相对较新且不断变化,据我所见,关于它的 StackOverflow 问题只有 20 个,因此可能没有多少人在这里看到您的问题并且知道足够的帮助。
  • FWIW,我订阅了这个标签,所以总是有人在看。 Stackoverflow 是解决此类问题的好地方。

标签: python json parallel-processing export-to-csv dask


【解决方案1】:

这里的问题在于dask.dataframe.to_csv,它迫使您进入单核模式。

我建议使用dask.bag 进行读取和操作,然后并行转储到一堆 CSV 文件。转储到多个 CSV 文件比转储到单个 CSV 文件更容易协调。

import dask.bag as bag
import json
b = bag.from_filenames('input.json.gz').map(json.loads).map(manipulate).concat()
b.map(lambda t: ','.join(map(str, t)).to_textfiles('out.*.csv').compute()

尝试并行读取单个 GZIP 文件也可能会出现问题,但以上内容应该可以帮助您入门。

【讨论】:

  • 感谢@MRocklin!它没有用。大声笑...但是后来我将输入文件拆分为多个块并且它起作用了。似乎它只使用与输入文件数量一样多的 CPU。是否有计划使此功能动态化,以便您可以传入一个输入文件,然后包将拆分它并在后台并行处理它?
  • dask.bag 现在可以做到这一点,只是不完美。这里一个可能的问题是 GZIP 对随机访问的支持很差。
【解决方案2】:

似乎袋子的平行度取决于它们拥有的分区数量。

对我来说,跑步

mybag=bag.from_filenames(filename, chunkbytes=1e7)
mybag.npartitions

产量

1746

这解决了问题并使处理完全可并行化。

【讨论】:

    【解决方案3】:

    如果您提供基于 glob 的文件名,例如MyFiles-*.csv dask dataframe.to_csv() 你应该能够将数据帧输出到磁盘。 它将创建多个文件而不是 1 个大型 csv 文件。请参阅此线程以获取更多 https://groups.google.com/a/continuum.io/forum/#!searchin/blaze-dev/to_csv/blaze-dev/NCQfCoOWEcI/S7fwuCfeCgAJ

    MyFiles-0001.csv  
    MyFiles-0002.csv 
    ....
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-08-02
      • 2016-08-18
      • 2019-12-05
      相关资源
      最近更新 更多