【发布时间】:2016-03-08 13:13:53
【问题描述】:
我有一个 python 脚本,它执行以下操作: 一世。它接受数据的输入文件(通常是嵌套的 JSON 格式) ii.将数据逐行传递给另一个函数,该函数将数据处理为所需格式 iii.最后将输出写入文件。
这是我当前执行此操作的简单 python 行...
def manipulate(line):
# a pure python function which transforms the data
# ...
return manipulated_json
for line in f:
components.append(manipulate(ujson.loads(line)))
write_to_csv(components)`
这是可行的,但是由于 python GIL 将其限制为服务器上的一个核心,因此速度非常慢,尤其是在处理大量数据的情况下。
我通常处理的数据量大约是 4 gigs gzip 压缩,但有时我必须处理数百 gigs gzip 压缩的数据。它不一定是大数据,但仍然无法全部在内存中处理,并且使用 Python 的 GIL 处理速度非常慢。
在寻找优化数据处理的解决方案时,我遇到了 dask。虽然 PySpark 在当时对我来说似乎是显而易见的解决方案,但 dask 的承诺和它的简单性赢得了我的青睐,我决定试一试。
在对 dask 以及如何使用它进行了大量研究之后,我编写了一个非常小的脚本来复制我当前的流程。脚本如下所示:
import dask.bag as bag
import json
bag.from_filenames('input.json.gz').map(json.loads).map(lambda x:manipulate(x)).concat().to_dataframe().to_csv('output.csv.gz')`
这可以工作并产生与我原来的非 dask 脚本相同的结果,但它仍然只使用服务器上的一个 CPU。所以,它根本没有帮助。事实上,它更慢。
我做错了什么?我错过了什么吗?我对 dask 还是很陌生,所以让我知道我是否忽略了某些东西,或者我是否应该做一些完全不同的事情。
另外,是否有任何替代 dask 来使用服务器的全部容量(即所有 CPU)来完成我需要做的事情?
谢谢,
T
【问题讨论】:
-
嗯从未听说过
dask,真的很有趣,谢谢。你看过盒子标准的multiprocessing吗?这很简单(主义),但很有效。 -
您可能想在Blaze mailing list 上提问。 Dask 相对较新且不断变化,据我所见,关于它的 StackOverflow 问题只有 20 个,因此可能没有多少人在这里看到您的问题并且知道足够的帮助。
-
FWIW,我订阅了这个标签,所以总是有人在看。 Stackoverflow 是解决此类问题的好地方。
标签: python json parallel-processing export-to-csv dask