【问题标题】:How to reset index on concatenated dataframe in Dask如何在 Dask 中重置连接数据帧的索引
【发布时间】:2020-04-23 19:22:01
【问题描述】:

我是 Dask 的新手,我认为这将是一项简单的任务。我想从多个 csv 文件中加载数据并将其合并到一个 Dask 数据帧中。在这个例子中,有 5 个 csv 文件,每个文件有 10,000 行数据。显然我想给组合的数据框一个唯一的索引。

所以我这样做了:

import dask.dataframe as dd

# Define Dask computations
dataframes = [
    dd.read_csv(os.path.join(data_dir, filename)).set_index('Unnamed: 0')
    for filename in os.listdir(data_dir) if filename.endswith('.csv')
]

combined_df = dd.concat(dataframes).reset_index(drop=True)

如果我这样做 combined_df.head().index 我会按预期得到:

RangeIndex(start=0, stop=5, step=1)

但是combined_df.tail().index 却不如预期:

RangeIndex(start=3252, stop=3257, step=1)

进一步检查发现combined_df 上的索引值由 15 个单独的系列组成,长度约为 3256,总长度为 50000。请注意,csv 文件的第一列都包含从 0 到 10000 的索引。

这里发生了什么?如何获得一个从 0 到 50000 的标准整数索引,即所有 csv 文件中的总行数?

背景

如果你需要测试上面的代码,这里有一个设置脚本来创建一些 csv 文件:

import os
import numpy as np
import pandas as pd

# Create 5 large csv files (could be too big to fit all in memory)
shape = (10000, 1000)

data_dir = 'data'
if not os.path.exists(data_dir):
    os.mkdir(data_dir)

for i in range(5):
    filepath = os.path.join(data_dir, f'datafile_{i:02d}.csv')
    if not os.path.exists(filepath):
        data = (i + 1) * np.random.randn(shape[0], shape[1])
        print(f"Array {i} size in memory: {data.nbytes*1e-6:.2f} MB")
        pd.DataFrame(data).to_csv(filepath)

更新:

这种方法似乎也会出现同样的问题:

combined_df = dd.read_csv(os.path.join(data_dir, '*.csv'))
print(dd.compute(combined_df.tail().index)[0])
print(dd.compute(combined_df.reset_index(drop=True).tail().index)[0])

RangeIndex(start=3252, stop=3257, step=1)
RangeIndex(start=3252, stop=3257, step=1)

在我看来reset_index 方法会产生相同的索引。

【问题讨论】:

  • 啊,我现在在 documentation 中看到它说“请注意,与 pandas 不同,重置 dask.dataframe 索引不会从 0 单调增加。相反,它会从 0 重新开始分区(例如index1 = [0, ..., 10], index2 = [0, ...])。这是由于无法静态知道索引的完整长度。”

标签: python dataframe indexing concatenation dask


【解决方案1】:

dask 版本中,reset_index单独执行其任务 (并同时)在每个分区上,因此索引中的连续数字 “重启”作为一些点,实际上是在每个分区的开始。

要规避此限制,您可以:

  • 分配一个用 1 填充的新列。
  • 将索引设置为 cumsum() - 1 在此列上计算(幸运的是, 与reset_index相反,cumsum是在整体上计算的 数据帧)。

一个副作用是索引的名称现在是这个新的名称 柱子。 如果要清除它,则必须在分区级别进行,调用 map_partitions.

所以整个代码可以是:

ddf = ddf.assign(idx=1)
ddf = ddf.set_index(ddf.idx.cumsum() - 1)
ddf = ddf.map_partitions(lambda df: df.rename(index = {'idx': None}))

注意 assign(idx=1) 是可以的,因为这显然 single 值是 广播到整个DataFrame的长度,所以每个元素 在这个新列中将设置为 1,而我不知道有多少 DataFrame 包含的行。这是该系列的一大特色 底层 Numpy 包,大大简化了编程 在 NumpyPandas 以及 dask 中。

然后你可以运行:ddf.compute() 来查看结果。

【讨论】:

  • 有趣。感谢您的解决方法,但我们肯定希望有一个像在 Pandas 上一样工作的 reset_index 方法吗?
  • 先从 dask DataFrame 转换到 Pandas 版本然后然后调用 reset_index()?这样,“pandasonic”版本将在整个 DataFrame 上运行,而不是在单个分区上运行(就像在 dask 中那样)。
  • 我的数据框不适合内存。这就是我使用 Dask 的原因。
  • ddf = ddf.set_index(np.arange(len(ddf)))不起作用是什么原因?这是我通常会在 Pandas 中替换索引的方法。 (引发KeyError)。
  • 在 Pandas documentation for set_index 中,它说您可以使用“一个或多个现有列或数组(长度正确)”,而在 Dask 文档中它只说“使用现有列”。所以我想总体而言,Dask 与 Pandas 相比具有部分功能。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2020-02-18
  • 2021-01-12
  • 2017-09-04
  • 2018-02-26
  • 2017-10-04
  • 1970-01-01
  • 2023-01-12
相关资源
最近更新 更多