【发布时间】:2020-04-23 19:22:01
【问题描述】:
我是 Dask 的新手,我认为这将是一项简单的任务。我想从多个 csv 文件中加载数据并将其合并到一个 Dask 数据帧中。在这个例子中,有 5 个 csv 文件,每个文件有 10,000 行数据。显然我想给组合的数据框一个唯一的索引。
所以我这样做了:
import dask.dataframe as dd
# Define Dask computations
dataframes = [
dd.read_csv(os.path.join(data_dir, filename)).set_index('Unnamed: 0')
for filename in os.listdir(data_dir) if filename.endswith('.csv')
]
combined_df = dd.concat(dataframes).reset_index(drop=True)
如果我这样做 combined_df.head().index 我会按预期得到:
RangeIndex(start=0, stop=5, step=1)
但是combined_df.tail().index 却不如预期:
RangeIndex(start=3252, stop=3257, step=1)
进一步检查发现combined_df 上的索引值由 15 个单独的系列组成,长度约为 3256,总长度为 50000。请注意,csv 文件的第一列都包含从 0 到 10000 的索引。
这里发生了什么?如何获得一个从 0 到 50000 的标准整数索引,即所有 csv 文件中的总行数?
背景
如果你需要测试上面的代码,这里有一个设置脚本来创建一些 csv 文件:
import os
import numpy as np
import pandas as pd
# Create 5 large csv files (could be too big to fit all in memory)
shape = (10000, 1000)
data_dir = 'data'
if not os.path.exists(data_dir):
os.mkdir(data_dir)
for i in range(5):
filepath = os.path.join(data_dir, f'datafile_{i:02d}.csv')
if not os.path.exists(filepath):
data = (i + 1) * np.random.randn(shape[0], shape[1])
print(f"Array {i} size in memory: {data.nbytes*1e-6:.2f} MB")
pd.DataFrame(data).to_csv(filepath)
更新:
这种方法似乎也会出现同样的问题:
combined_df = dd.read_csv(os.path.join(data_dir, '*.csv'))
print(dd.compute(combined_df.tail().index)[0])
print(dd.compute(combined_df.reset_index(drop=True).tail().index)[0])
RangeIndex(start=3252, stop=3257, step=1)
RangeIndex(start=3252, stop=3257, step=1)
在我看来reset_index 方法会产生相同的索引。
【问题讨论】:
-
啊,我现在在 documentation 中看到它说“请注意,与 pandas 不同,重置
dask.dataframe索引不会从 0 单调增加。相反,它会从 0 重新开始分区(例如index1 = [0, ..., 10], index2 = [0, ...])。这是由于无法静态知道索引的完整长度。”
标签: python dataframe indexing concatenation dask