【问题标题】:df.groupby(...).apply(...) function in dask dataframedask 数据框中的 df.groupby(...).apply(...) 函数
【发布时间】:2019-09-15 10:42:19
【问题描述】:

我使用 Python dask 处理大型 csv 面板数据集 (15+GB),我需要执行 groupby(...).apply(...) 函数来删除每天每只股票的最后观察值。我的数据集看起来像

 stock     date     time   spread  time_diff 
  VOD      01-01    9:05    0.01     0:07     
  VOD      01-01    9:12    0.03     0:52     
  VOD      01-01   10:04    0.02     0:11
  VOD      01-01   10:15    0.01     0:10     
  VOD      01-01   10:25    0.03     0:39  
  VOD      01-01   11:04    0.02    22:00 
  VOD      01-02    9:04    0.02     0:05
  ...       ...     ...     ....     ...
  BAT      01-01    13:05   0.04    10:02
  BAT      01-02    9:07    0.05     0:03
  BAT      01-02    9:10    0.06     0:04
  ...       ...     ...     ....     ...

如果数据框在 pandas 中,那么这可以通过

df_new=df_have.groupby(['stock','date'], as_index=False).apply(lambda x: x.iloc[:-1])

此代码适用于 pandas df。但是,我无法在 dask 数据框中执行此代码。我做了以下尝试。

ddf_new=ddf_have.groupby(['stock','date']).apply(lambda x: x.iloc[:-1]).compute()

ddf_new=ddf_have.groupby(['stock','date']).apply(lambda x: x.iloc[:-1], meta=('stock' : 'f8')).compute()

ddf_new=ddf_have.groupby(['stock','date']).apply(lambda x: x.iloc[:-1], meta=meta).compute()

不幸的是,它们都不起作用。谁能帮我为 dask dataframe 获取正确的代码?谢了

【问题讨论】:

  • 如果你不运行compute,你只是在创建一个图表。使用 dask 的黄金法则是,如果操作在 pandas 中运行,则无需切换到 dask。你介意生成mcve吗?
  • @rpanai 谢谢。我添加了一个简单的示例。
  • 关于性能:您的数据是否存储在单个CSV 文件中?
  • @rpanai 是的......

标签: python pandas dataframe group-by dask


【解决方案1】:

我认为对于您的具体情况,问题是您分配的meta。这应该可以。

import pandas as pd
import numpy as np
import dask.dataframe as dd

dates = pd.date_range(start='2019-01-01',
                      end='2019-12-31',
                      freq='5T')

out = []
for stock in list("abcdefgh"):
    df = pd.DataFrame({"stock":[stock]*len(dates),
                       "date":dates,
                       "spread":np.random.randn(len(dates))})
    df["time_diff"] = df["date"].diff().shift(-1)
    df["time"] = df["date"].dt.time.astype(str)
    df["date"] = df["date"].dt.date.astype(str)
    out.append(df)
df = pd.concat(out, ignore_index=True)

del out

ddf = dd.from_pandas(df, npartitions=4)

out = ddf.groupby(['stock','date']).apply(lambda x: x[:-1],
                                          meta={"stock":"str",
                                                "date":"str",
                                                "spread":"f8",
                                                "time_diff":"str",
                                                "time":"str"})
out = out.compute().reset_index(drop=True)

如果您可以按股票日期很好地对文件进行分区并保存在to_parquet 中,那么您可以获得更好的性能,因为您可以使用map_partitions 而不是apply

【讨论】:

  • 感谢您的回复。我看到您在将其转换为 dask 之前已在 pandas 数据框中定义了每一列。我在开始时使用 dd.read_csv 将数据导入到 dask 数据框,并定义了每一列。我尝试了您的代码,但仍然无法获得输出。 ValueError: Key 2018-01-01 00:00:00 不在级别索引中。
  • 我刚刚创建了一个df。如果您使用df.to_csv("out.csv", index=False),然后使用ddf = dd.read_csv("out.csv"),则与您的工作案例相同。
  • 此时间戳2018-01-01 00:00:00 不在我的示例中。你介意检查一下你的数据框的dtypes 吗?
  • 感谢您的回复。我使用了ddf.astype,我得到了股票是对象,日期是对象,价差是 float64,时间是 str。但是,当我尝试 meta={"stock":"object", "date":"object", "spread":"f8", "time_diff":"f8", "time":"str"} 时,我得到了`计算数据中的列与提供的元数据中的列不匹配`。蚂蚁的想法?再次感谢。
  • 经过多次尝试,我发现如果我按单列索引分组而不提供meta,我可以运行代码。如果我按两列分组,那么我只能运行简单的语法,例如 ddf.groupby(['stock','date']).sum()
猜你喜欢
  • 1970-01-01
  • 2018-12-05
  • 2013-04-18
  • 1970-01-01
  • 1970-01-01
  • 2013-09-01
  • 1970-01-01
  • 2012-03-26
  • 1970-01-01
相关资源
最近更新 更多