【问题标题】:Datetime index-based slicing with Dask使用 Dask 进行基于日期时间索引的切片
【发布时间】:2021-05-06 14:25:03
【问题描述】:

我有两个数据框:links 有两个日期时间列,分别称为 onsetoffset,每一行都是一个事件。另一个数据帧称为sensors,使用频率为 1m 的日期时间索引进行索引,并且有约 600 列,每列用于一个传感器 ID。本质上,对于每个links 行,我想使用起始值和偏移值作为时间范围,从sensors 中分割相关行,通过获取平均值将它们聚合到行中,然后连接单行平均值传感器值水平到links 数据框。我已经设法通过遵循 Pandas 代码做到了这一点,它可以工作,但我有很多数据而且速度非常慢。

def search_sensors(sensors, start_time, stop_time):
    s = sensors[start_time: stop_time]
    s = s.mean()
    return s

# add column names of sensor-ids
links[sensors.columns] = None

for index, row in links.iterrows():
    start_time = row['start_time']
    stop_time = row['stop_time']
    mean_sensors = search_sensors(sensors, start_time, stop_time)
    links.iloc[index, sensors.columns] = mean_sensors.to_list()

我已经用 Dask 尝试了一些东西,但没有运气。

  1. 在 Pandas 中使用 dask.delayed(),我得到一个 UserWarning: Large object of size 35.62 MiB detected in task graph:
    mean_sensors_list = []

    for index, row in links.iterrows():
        start_time = row['start_time']
        stop_time = row['stop_time']
        mean_sensors = dask.delayed(search_sensors)(sensors, start_time, stop_time)
        links_list.append(mean_sensors)   # mean_sensors is delayed object containing a pandas.Series of shape (600, nan) 

    results = dask.compute(*mean_sensors_list)
  1. dask.dataframe() 与以下代码一起使用与 Pandas 一样慢,而且我在 Dask 仪表板中看不到任何并行化指示。
    sensors_dd = dd.from_pandas(sensors_interp, npartitions=1)
    links_dd = dd.from_pandas(links, npartitions=1)

    mean_sensors_list = []

    for index, row in links_dd.iterrows():
        start_time = row['start_time']
        stop_time = row['stop_time']
        mean_sensors = search_sensors(sensors_dd, start_time, stop_time)
        links_list.append(mean_sensors)   # mean_sensors is a dask.Series of shape (600, nan)

    results = dask.compute(*mean_sensors_list)
  1. 同时使用 1 和 2,即mean_sensors = dask.delayed(search_sensors)(sensors_dd, start_time, stop_time),mean_sensors 是一个延迟对象,包含一个 dask.Series 形状 (600, nan) 但执行速度很慢。 Dashboard 显示了 3 个任务(search_sensors、finalize、from_pandas)的一些并行化,4 个 worker 显示 CPU 使用率非常低。另外,当我运行 Ubuntu 时,它会显示一条磁盘空间不足的消息。

【问题讨论】:

    标签: pandas dask-delayed dask-dataframe


    【解决方案1】:

    Dask 的新手,我对 map_partitions() 不熟悉。问题的解决方法如下:

    res = links_dd.map_partitions(lambda df: df.apply((lambda row: search_sensors(sensors, row.start_time, row.stop_time)), axis=1)).compute()
    
    

    在我的 4 核笔记本电脑中速度非常快。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-09-26
      • 1970-01-01
      • 2016-04-16
      • 2016-01-18
      • 2020-01-02
      • 2015-07-19
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多