【问题标题】:How to efficiently concat csv files in dask horizontally, then vertically?如何在 dask 中有效地连接 csv 文件,然后垂直连接?
【发布时间】:2022-08-14 16:49:18
【问题描述】:

给定 3 个相同行数的 csv 文件,如下所示

fx.csv:

7.23,4.41,0.17453,0.12
6.63,3.21,0.3453,0.32
2.27,2.21,0.3953,0.83

f0.csv:

1.23,3.21,0.123,0.12
8.23,9.21,0.183,0.32
7.23,6.21,0.123,0.12

f1.csv

6.23,3.21,0.153,0.123
2.23,2.26,0.182,0.22
9.23,9.21,0.183,0.135

f0.csvf1.csv 带有相应的标签 0s 和 1s。

目标是读入dask.DataFrame。我们得到的级联值

  1. fx.csvf0.csv0s 水平连接
  2. fx.csvf1.csv1s 水平连接
  3. 垂直连接 (1) 和 (2)

    我尝试这样做以将它们读入 dask 文件并保存到 hdf 存储中:

    import dask.dataframe as dd
    import dask.array as da
    
    fx = dd.read_csv(\'fx.csv\', header=None)
    f0 = dd.read_csv(\'f0.csv\', header=None)
    f1 = dd.read_csv(\'f1.csv\', header=None)
    
    l0 = dd.from_array(np.array([1] * len(fx)))
    l1 = dd.from_array(np.array([1] * len(fx)))
    
    da.to_np_stack(\'data/\', 
      da.concatenate( [
        dd.concat([fx.compute(), f0.compute(), l0.compute()], axis=1),
        dd.concat([fx.compute(), f1.compute(), l1.compute()], axis=1)
        ], axis=0, allow_unknown_chunksizes=True),
      axis=0)
    
    

    我也可以在将其读入 dask 文件之前在 unix 中执行这些操作,如下所示:

    # Create the label files.
    $ wc -l fx.csv
    4
    
    $ seq 4 | sed \"c 0\" > l0.csv
    $ seq 4 | sed \"c 0\" > l1.csv
    
    # Concat horizontally
    $ paste fx.csv f0.csv l0.csv -d\",\" > x0.csv
    $ paste fx.csv f1.csv l1.csv -d\",\" > x1.csv
    
    $ cat x0.csv x1.csv > data.csv
    

    实际数据集每个 f*.csv 文件有 256 列和 22,000,000 行。所以运行 dask python 代码并不容易。

    我的问题(部分是):

    1. Python 代码中的 dask 方法是读取数据并将其输出到 hdf5 存储中的最简单/内存有效的方法吗?

    2. 有没有比上面描述的unix方式更有效的其他方法?

  • 看看this answer,不完全一样,但可能对你有帮助。

标签: python dataframe csv dask dask-dataframe


【解决方案1】:

下面的代码是您的 sn-p 的修改版本。

读取csv时,跨分区的行分配是 基于块大小,因此基本的 concat 操作不是 保证开箱即用,因为分区可能不会 对齐。要解决它,请索引数据。

接下来,可以使用.assign 方法创建 0/1 列(与pandas 中的工作方式相同)。在保存数组之前,您可能还想按照this answer 中的说明重新分块,但这是可选的。

import dask.dataframe as dd
import dask.array as da

def _index_ddf(df):
   """Generate a unique row-based index. See also https://stackoverflow.com/a/65839787/10693596"""
   df['new_index'] = 1
   df['new_index'] = df['new_index'].cumsum()
   df = df.set_index('new_index', sorted=True)
   return df

fx = dd.read_csv('fx.csv', header=None)
fx = _index_ddf(fx)

f0 = dd.read_csv('f0.csv', header=None)
f0 = _index_ddf(f0)

f1 = dd.read_csv('f1.csv', header=None)
f1 = _index_ddf(f1)

# columns of 0/1 can be created by assignment
A1 = dd.concat([fx, f0], axis=1).assign(zeros=0).to_dask_array(lengths=True)
A2 = dd.concat([fx, f1], axis=1).assign(ones=1).to_dask_array(lengths=True)

# stack
A = da.concatenate([A1, A2], axis=0)

# save
da.to_npy_stack('data/', A, axis=0)

#optional: to have even sized chunks, can rechunk the data, see https://stackoverflow.com/a/73218995/10693596

【讨论】:

    【解决方案2】:

    您可以逐行读取文件并通过它们创建新的 .csv,而不是首先将所有数据加载到您的 ram 中。下面的代码为你做:

    FILE_PATHS = [
        '/home/amir/data/1.csv',
        '/home/amir/data/2.csv',
        '/home/amir/data/3.csv',
    ]
    
    NEW_FILE_PATH = '/home/amir/data/new.csv'
    
    fout = open(NEW_FILE_PATH, 'w')
    
    for file_path in FILE_PATHS:
        with open(file_path, 'r') as fin:
            for line in fin:
                fout.write(line)
    

    【讨论】:

      猜你喜欢
      • 2017-10-04
      • 1970-01-01
      • 2021-08-25
      • 1970-01-01
      • 2016-02-29
      • 2012-08-29
      • 2020-07-20
      • 1970-01-01
      • 2020-03-18
      相关资源
      最近更新 更多