【问题标题】:Computation on sample of dask dataframe takes much longer than on all the data对 dask 数据帧样本的计算比对所有数据的计算时间长得多
【发布时间】:2018-03-09 03:03:41
【问题描述】:

我有一个由镶木地板支持的 dask 数据框。这是 1.31 亿行,当我对整个框架进行一些基本操作时,它们需要几分钟时间。

df = dd.read_parquet('data_*.pqt')
unique_locations = df.location.unique()
https = unique_locations.str.startswith('https:')
http = unique_locations.str.startswith('http:')
total_locations = len(unique_locations)
n_https = https.sum().compute()
n_http = http.sum().compute()

时间:

CPU times: user 2min 49s, sys: 23.9 s, total: 3min 13s
Wall time: 1min 53s

我天真地认为,如果我抽取一个数据样本,我这次可以降低,然后做了:

df = dd.read_parquet('data_*.pqt')
df = df.sample(frac=0.05)
unique_locations = df.location.unique()
https = unique_locations.str.startswith('https:')
http = unique_locations.str.startswith('http:')
total_locations = len(unique_locations)
n_https = https.sum().compute()
n_http = http.sum().compute()

时间:

Unknown, I stopped it after 45minutes.

我猜我的示例无法有效地访问我的所有后续计算,但我不知道如何解决它。

我对从 dask 数据帧中采样数据然后使用该示例的最佳方法感兴趣。

【问题讨论】:

  • 如果我做df = df.get_partition(0),那么计算运行得很快,但这不是我数据的随机样本。我的数据框的分区数与磁盘上的文件数相同,所以我可以用 pandas 读取一个文件,但这不是我想做的采样。
  • 没有线索。我建议阅读Understanding Performance 文档,特别是尝试使用 dask.distributed 诊断仪表板来了解什么占用了时间。我建议查看剖面图youtube link
  • 另外,您可能需要考虑将三个计算调用与@​​987654329@ 合二为一
  • 我已阅读了解性能。我有一些不错的任务图,但我不了解快速与快速任务图的外观。我的慢速计算有很多并行组件,它们最后都聚集在一起。我认为这会很快。
  • 我在本地机器上使用 dask,而不是分布式。我使用了分析器(ResourceProfiler、CacheProfiler 等)并在任务完成后将它们可视化,如此处所述:dask.pydata.org/en/latest/diagnostics-local.html。但是我看不到在操作进行时可视化的方法,所以我不明白如何利用这些工具来处理我运行缓慢的过程。 (最后一次我等了一个多小时 frac=0.01)

标签: dask


【解决方案1】:

我没有明确/简单的答案,但我确实有很多东西可以共同解决我的问题。

1) 我的代码效率低下,选择我需要处理的特定列可以使一切正常。我的新代码:

import dask.dataframe as dd
from dask.distributed import Client, progress
client = Client()  # Took me a little while to get the settings correct

def get_df(*columns):
    files = '../cache_new/sample_*.pqt'
    df = dd.read_parquet(files, columns=columns, engine='pyarrow')
    return df

# All data - Takes 31s
df_all = get_df('location')
unique_locations = df_all.location.unique()
https = unique_locations.str.startswith('https:')
http = unique_locations.str.startswith('http:')
_total_locations = unique_locations.size.persist()
_n_https = https.sum().persist()
_n_http = http.sum().persist()
progress(_total_locations, _n_https, _n_http)

# 1% sample data - Takes 21s
df_sample = get_df('location').sample(frac=0.01)
unique_locations = df_sample.location.unique()
https = unique_locations.str.startswith('https:')
http = unique_locations.str.startswith('http:')
_total_locations = unique_locations.size.persist()
_n_https = https.sum().persist()
_n_http = http.sum().persist()
progress(_total_locations, _n_https, _n_http)

事实证明这并不是一个很大的加速。整个计算所花费的时间主要是读入数据。如果计算非常昂贵,我想我会看到更多的加速。

2) 我切换到在本地使用分布式调度程序,这样我就可以看到发生了什么。但这并非没有问题:

  1. 我在使用 fastparquet 时遇到了某种错误,导致 我的进程死了,我需要使用 pyarrow(不使用分布式客户端时这不是问题)
  2. 我不得不手动设置线程数和 memory_limit

3) 我发现一个在笔记本中多次读取相同数据的错误 - https://github.com/dask/dask/issues/3268

4) 我也遇到了 pandas https://github.com/pandas-dev/pandas/issues/19941#issuecomment-371960712 中的内存泄漏错误

由于 (3) 和 (4) 以及在我的原始代码中我在所有列中的阅读效率低下这一事实,我看到了为什么我的示例无法正常工作的许多原因,尽管我从未找到明确的答案。

【讨论】:

    【解决方案2】:

    这里发生的情况是,通过添加示例,您将停止优化发生。当您执行以下操作时:

    df = dd.read_parquet('data_*.pqt')
    df.x.sum()
    

    Dask 巧妙地将其重新排列为以下内容:

    df = dd.read_parquet('data_*.pqt', columns=['x'])
    df.x.sum()
    

    Dask.dataframe 仅读取您需要的一列。这是 dask.dataframe 提供的少数优化之一(它没有做太多高级优化)。

    但是,当您将样本放入其中(或任何操作)时

    df = dd.read_parquet('data_*.pqt', columns=['x'])
    df.sample(...).x.sum()
    

    然后你没有得到优化,所以一切都很慢。

    所以这里并不是说样本很慢,而是来自 parquet 的整个数据集很慢,并且在 read_parquet 和列访问步骤之间存在样本会阻止优化的发生。

    始终在 read_parquet 中指定列

    为避免这种情况,您应该始终在 dd.read_parquet 中明确指定您需要的列。

    最终,很高兴看到一些高级框架提供比 Dask 数据框更智能的查询优化。如果您想推动这一进程,您可能会在 Ibis

    上提出问题

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-06-19
      • 2021-10-17
      • 1970-01-01
      • 1970-01-01
      • 2019-05-24
      • 1970-01-01
      • 1970-01-01
      • 2013-10-17
      相关资源
      最近更新 更多