【问题标题】:Using Dask Delayed on Small/Partitioned Dataframes在小型/分区数据帧上使用延迟的 Dask
【发布时间】:2020-01-24 20:27:07
【问题描述】:

我正在处理时间序列数据,这些数据的格式设置为每一行都是 ID/时间/数据的单个实例。这意味着每个 ID 的行不对应 1 对 1。每个 ID 在时间上都有很多行。

我正在尝试使用 dask delay 来让函数在整个 ID 序列上运行(该操作应该能够同时在每个单独的 ID 上运行是有道理的,因为它们不会相互影响)。为此,我首先遍历每个 ID 标签,从该 ID 中提取/定位所有数据(在 pandas 中使用 .loc,因此它是一个单独的“迷你”df),然后延迟对迷你 df 的函数调用,添加具有延迟值的列并将其添加到所有迷你 dfs 的列表中。在 for 循环结束时,我想一次在所有迷你 dfs 上调用 dask.compute() 但由于某种原因迷你 df 的值仍然延迟。下面我将发布一些关于我刚刚试图解释的伪代码。

我有一种感觉,这可能不是解决此问题的最佳方法,但在当时这是有道理的,我不明白出了什么问题,因此非常感谢任何帮助。

这是我想要做的:

list_of_mini_dfs = []
for id in big_df:

    curr_df = big_df.loc[big_df['id'] == id]
    curr_df['new value 1'] = dask.delayed(myfunc)(args1)
    curr_df['new value 2'] = dask.delayed(myfunc)(args2) #same func as previous line

    list_of_mini_dfs.append(curr_df)

list_of_mini_dfs = dask.delayed(list_of_mini_dfs).compute()

Concat all mini dfs into new big df.

正如您从代码中看到的那样,我必须进入我的大/整体数据框以提取每个 ID 的数据序列,因为它散布在行中。我希望能够对该单个 ID 的数据调用延迟函数,然后将函数调用中的值返回到大/整体数据帧中。

目前这种方法不起作用,当我将所有迷你数据帧重新连接在一起时,我延迟的两个值仍然延迟,这让我认为这是由于我在 df 和试图计算数据帧列表。我只是不知道如何解决它。

希望这比较清楚,感谢您的帮助。

【问题讨论】:

  • 您介意分享mcve吗?

标签: pandas parallel-processing time-series dask dask-delayed


【解决方案1】:

IIUC 您正在尝试使用 dask 执行某种transform

import pandas as pd
import dask.dataframe as dd
import numpy as np

# generate big_df
dates = pd.date_range(start='2019-01-01',
                      end='2020-01-01')
l = len(dates)
out = []
for i in range(1000):
    df = pd.DataFrame({"ID":[i]*l,
                       "date": dates,
                       "data0": np.random.randn(l),
                       "data1": np.random.randn(l)})

    out.append(df)

big_df = pd.concat(out, ignore_index=True)\
           .sample(frac=1)\
           .reset_index(drop=True)

现在您想在 data0data1 列上应用您的函数 fun

熊猫

out = big_df.groupby("ID")[["data0","data1"]]\
            .apply(fun)\
            .reset_index()

df_pd = pd.merge(big_df, out, how="left", on="ID" )

黎明

df = dd.from_pandas(big_df, npartitions=4)

out = df.groupby("ID")[["data0","data1"]]\
        .apply(fun, meta={'data0':'f8',
                          'data1':'f8'})\
        .rename(columns={'data0': 'new_values0',
                         'data1': 'new_values1'})\
        .compute() # Here you need to compute otherwise you'll get NaNs

df_dask = dd.merge(df, out,
                   how="left", 
                   left_on=["ID"],
                   right_index=True)

dask 版本不一定比 pandas 快。特别是如果您的 df 适合 RAM。

【讨论】:

  • 您好,rpanai,感谢您在这里的帮助,看来您确实很好地理解了这个问题,我将尝试上述建议。我在想 dask 延迟将是要走的路,但是在阅读了您的实现之后,我看到了如何使用 dask 数据帧来做到这一点。再次感谢您。
  • 没问题。如果答案有帮助,请考虑接受和/或投票。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-07-06
  • 2017-11-23
  • 2023-04-04
  • 2019-07-24
相关资源
最近更新 更多