【问题标题】:Large csv to parquet using Dask - OOM使用 Dask 将大型 csv 转换为镶木地板 - OOM
【发布时间】:2020-06-03 20:28:26
【问题描述】:

我有 7 个 8 GB 的 csv 文件,需要转换为 parquet。

内存使用量达到 100 GB,我不得不杀死它。 我也尝试过分布式 Dask。内存限制为 12 GB,但长时间没有输出。 供参考。我曾经使用具有 Chunking + Prod 消费者的传统 pandas --> 能够在 30 分钟内转换 Dask 处理缺少什么?

def ProcessChunk(df,...):
    df.to_parquet()     

for factfile in fArrFileList:
   df = dd.read_csv(factfile, blocksize="100MB",
                 dtype=fColTypes, header=None, sep='|',names=fCSVCols)
   result = ProcessChunk(df,output_parquet_file, chunksize, fPQ_Schema, fCSVCols, fColTypes)

【问题讨论】:

标签: dask


【解决方案1】:

感谢大家的建议。 map_partitions 工作。

df = dd.read_csv(filename, blocksize="500MB",
                     dtype=fColTypes, header=None, sep='|',names=fCSVCols)
df.map_partitions(DoWork,output_parquet_file, chunksize, Schema, CSVCols, fColTypes).compute(num_workers=2)

但是 Dask 分布式本地集群的相同方法效果不佳。当 csv 大小

【讨论】:

    【解决方案2】:

    我遇到了类似的问题,我发现使用 dask 分割成最小的镶木地板非常慢,最终会失败。如果您可以访问 Linux 终端,则可以使用并行或拆分。有关来自here的使用检查答案示例

    我的工作流程假设您的文件名为 file1.csv,..., file7.csv 并存储在 data/raw 中。我假设您正在使用笔记本中的终端命令,这就是我添加 %%bash 魔术的原因

    • 创建文件夹data/raw_part/part1/,... ,data/raw_part/part7/
    %%bash
    for year in {1..7}
    do
    mkdir -p data/raw_parts/part${i}
    done
    
    • 为每个文件运行(以防您想使用并行)
    %%bash
    cat data/raw/file1.csv | parallel --header : --pipe -N1000000 'cat >data/raw_parts/part1/file_{#}.csv'```
    

    将文件转换为镶木地板

    1. 首先创建输出文件夹
    %%bash
    for year in {1..7}
    do
    mkdir -p data/processed/part${i}
    done
    
    1. 定义将 csv 转换为 parquet 的函数
    import pandas as pd
    import os
    from dask import delayed, compute
    
    # this can run in parallel
    @delayed
    def convert2parquet(fn, fldr_in, fldr_out):
        fn_out = fn.replace(fldr_in, fldr_out)\
                   .replace(".csv", ".parquet")
    
        df = pd.read_csv(fn)
        df.to_parquet(fn_out, index=False)
    
    1. 获取所有要转换的文件
    jobs = []
    fldr_in = "data/raw_parts/"
    for (dirpath, dirnames, filenames) in os.walk(fldr_in):
        if len(filenames) > 0:
            jobs += [os.path.join(dirpath, fn)
                     for fn in filenames]
    
    1. 并行处理
    %%time
    to_process = [convert2parquet(job, fldr_in, fldr_out) for job in jobs]
    out = compute(to_process)
    

    【讨论】:

      猜你喜欢
      • 2018-11-09
      • 1970-01-01
      • 2014-11-25
      • 2018-01-04
      • 2019-03-23
      • 1970-01-01
      • 1970-01-01
      • 2020-09-22
      • 1970-01-01
      相关资源
      最近更新 更多