【问题标题】:Efficiently load and manipulate csv using dask DataFrame使用 dask DataFrame 有效地加载和操作 csv
【发布时间】:2020-07-08 09:37:35
【问题描述】:

我正在尝试使用dask.dataframe 操作来自https://www.kaggle.com/raymondsunartio/6000-nasdaq-stocks-historical-daily-prices 的csv 文件。原始数据框包含 'date'、'ticker'、'open'、'close'、 等列...

我的目标是创建一个新的数据框,其中索引 'date' 和列作为每个唯一代码的收盘价。

以下代码可以解决问题,但速度很慢,N = 6 使用了将近一分钟。我怀疑 dask 会尝试在 for 循环中多次读取 CSV 文件,但 我不知道如何才能让它更快。 我最初的猜测是使用 df.groupby('ticker')某个地方会有所帮助,但我对熊猫还不够熟悉。

import dask.dataframe as dd
from functools import reduce

def load_and_fix_csv(path: str, N: int, tickers: list = None) -> dd.DataFrame:
    raw = dd.read_csv(path, parse_dates=["date"])
    if tickers is None:
        tickers = raw.ticker.unique().compute()[:N] # Get unique tickers
    dfs = []
    for tick in tickers:
        tmp = raw[raw.ticker == tick][["date", "close"]] # Temporary dataframe from specific ticker with columns date, close
        dfs.append(tmp)
    df = reduce(lambda x, y: dd.merge(x, y, how="outer", on="date"), dfs) # Merge all dataframes on date
    df = df.set_index("date").compute()
    return df

感谢您的各种帮助! 谢谢。

【问题讨论】:

    标签: python python-3.x pandas dask dask-dataframe


    【解决方案1】:

    我很确定你是对的,Dask 很可能会在每个循环中“回到井中”;这是因为 Dask 构建了一个操作图并尝试将计算推迟到强制或必要时。我喜欢做的一件事是用Client.persist 截断图形的读取操作:

    from distributed import Client
    
    client = Client()
    
    
    def persist_load_and_fix_csv(path: str, N: int, tickers: list = None) -> dd.DataFrame:
        raw = dd.read_csv(path, parse_dates=["date"])
    
        # This "cuts the graph" prior operations (just the `read_csv` here)
        raw = client.persist(raw)
        if tickers is None:
            tickers = raw.ticker.unique().compute()[:N] # Get unique tickers
        dfs = []
        for tick in tickers:
            tmp = raw[raw.ticker == tick][["date", "close"]] # Temporary dataframe from specific ticker with columns date, close
            dfs.append(tmp)
        df = reduce(lambda x, y: dd.merge(x, y, how="outer", on="date"), dfs) # Merge all dataframes on date
        df = df.set_index("date").compute()
        return df
    

    在 Kaggle 会话中,我使用 persist_load_and_fix_csv(csv_path, N=3) 测试了这两个函数,并设法将时间缩短了一半。通过只保留最终使用的列,您还可以获得更好的性能。

    (注意:我发现,至少对我和我的代码而言,如果我开始看到 .compute() 出现在我应该退后一步并重新评估代码路径的函数中;我将其视为代码气味)

    【讨论】:

    • 谢谢,我会试试这个! “保留我最终使用的列”是指在阅读 csv 文件时?
    • 当然可以在那个时候完成,基本上尽量不要用你最终不会使用的数据锁定内存——至少删除client.persist之前的列
    猜你喜欢
    • 2020-02-01
    • 2019-10-30
    • 2020-06-07
    • 2022-08-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-01-12
    • 1970-01-01
    相关资源
    最近更新 更多