【发布时间】:2021-05-06 14:25:03
【问题描述】:
我有两个数据框:links 有两个日期时间列,分别称为 onset 和 offset,每一行都是一个事件。另一个数据帧称为sensors,使用频率为 1m 的日期时间索引进行索引,并且有约 600 列,每列用于一个传感器 ID。本质上,对于每个links 行,我想使用起始值和偏移值作为时间范围,从sensors 中分割相关行,通过获取平均值将它们聚合到行中,然后连接单行平均值传感器值水平到links 数据框。我已经设法通过遵循 Pandas 代码做到了这一点,它可以工作,但我有很多数据而且速度非常慢。
def search_sensors(sensors, start_time, stop_time):
s = sensors[start_time: stop_time]
s = s.mean()
return s
# add column names of sensor-ids
links[sensors.columns] = None
for index, row in links.iterrows():
start_time = row['start_time']
stop_time = row['stop_time']
mean_sensors = search_sensors(sensors, start_time, stop_time)
links.iloc[index, sensors.columns] = mean_sensors.to_list()
我已经用 Dask 尝试了一些东西,但没有运气。
- 在 Pandas 中使用
dask.delayed(),我得到一个UserWarning: Large object of size 35.62 MiB detected in task graph:
mean_sensors_list = []
for index, row in links.iterrows():
start_time = row['start_time']
stop_time = row['stop_time']
mean_sensors = dask.delayed(search_sensors)(sensors, start_time, stop_time)
links_list.append(mean_sensors) # mean_sensors is delayed object containing a pandas.Series of shape (600, nan)
results = dask.compute(*mean_sensors_list)
- 将
dask.dataframe()与以下代码一起使用与 Pandas 一样慢,而且我在 Dask 仪表板中看不到任何并行化指示。
sensors_dd = dd.from_pandas(sensors_interp, npartitions=1)
links_dd = dd.from_pandas(links, npartitions=1)
mean_sensors_list = []
for index, row in links_dd.iterrows():
start_time = row['start_time']
stop_time = row['stop_time']
mean_sensors = search_sensors(sensors_dd, start_time, stop_time)
links_list.append(mean_sensors) # mean_sensors is a dask.Series of shape (600, nan)
results = dask.compute(*mean_sensors_list)
- 同时使用 1 和 2,即
mean_sensors = dask.delayed(search_sensors)(sensors_dd, start_time, stop_time),mean_sensors 是一个延迟对象,包含一个 dask.Series 形状 (600, nan) 但执行速度很慢。 Dashboard 显示了 3 个任务(search_sensors、finalize、from_pandas)的一些并行化,4 个 worker 显示 CPU 使用率非常低。另外,当我运行 Ubuntu 时,它会显示一条磁盘空间不足的消息。
【问题讨论】:
标签: pandas dask-delayed dask-dataframe