【问题标题】:Read multiple parquet files in a folder and write to single csv file using python读取文件夹中的多个镶木地板文件并使用 python 写入单个 csv 文件
【发布时间】:2019-01-12 18:22:28
【问题描述】:

我是 python 新手,我有一个场景,其中有多个 parquet 文件,文件名按顺序排列。例如:par_file1、par_file2、par_file3 等等,一个文件夹中最多 100 个文件。

我需要从 file1 开始依次读取这些 parquet 文件并将其写入单个 csv 文件。写入 file1 的内容后,file2 的内容应附加到相同的 csv 中,而不需要标题。请注意,所有文件都有相同的列名,只有数据被拆分为多个文件。

我学会了使用 pyarrow 将单个 parquet 转换为 csv 文件,代码如下:

import pandas as pd    
df = pd.read_parquet('par_file.parquet')    
df.to_csv('csv_file.csv')

但我无法将其扩展为循环多个镶木地板文件并附加到单个 csv。 熊猫中有没有办法做到这一点?或任何其他方法都会有很大帮助。谢谢。

【问题讨论】:

  • 您是否在与 parquet 文件相同的目录中运行您的 python 代码?
  • 我正在考虑将镶木地板文件复制到本地文件夹并从本地计算机运行 python 代码。我对 python 完全陌生,不知道在哪里运行 python 代码。如果可以在代码中给出 hdfs 文件夹位置并将内容复制到本地的 csv 文件中,那也很好。如果我没有正确理解您的查询,我深表歉意。

标签: pandas csv parquet


【解决方案1】:

您可以使用 Dask 读取多个 Parquet 文件并将它们写入单个 CSV。

Dask 接受星号 (*) 作为通配符/全局字符以匹配相关文件名。

确保在写入 CSV 文件时将 single_file 设置为 True 并将 index 设置为 False

import pandas as pd
import numpy as np

# create some dummy dataframes using np.random and write to separate parquet files
rng = np.random.default_rng()

for i in range(3):
    df = pd.DataFrame(rng.integers(0, 100, size=(10, 4)), columns=list('ABCD'))
    df.to_parquet(f"dummy_df_{i}.parquet")

# load multiple parquet files with Dask
import dask.dataframe as dd
ddf = dd.read_parquet('dummy_df_*.parquet', index=False)

# write to single csv
ddf.to_csv("dummy_df_all.csv", 
           single_file=True, 
           index=False
)

# test to verify
df_test = pd.read_csv("dummy_df_all.csv")

为此使用 Dask 意味着您不必担心生成的文件大小(Dask 是一个分布式计算框架,可以处理您扔给它的任何东西,而如果生成的 DataFrame 太大,pandas 可能会抛出 MemoryError)并且您可以轻松地从 Amazon S3 等云数据存储中读取和写入。

【讨论】:

    【解决方案2】:

    我也有类似的需求,并且我阅读了当前的 Pandas 版本支持目录路径作为 read_csv 函数的参数。所以你可以像这样读取多个 parquet 文件:

    import pandas as pd    
    df = pd.read_parquet('path/to/the/parquet/files/directory')    
    

    它将所有内容连接到一个数据帧中,因此您可以立即将其转换为 csv:

    df.to_csv('csv_file.csv')
    

    根据文档确保您具有以下依赖项:

    • pyarrow
    • 快速镶木地板

    【讨论】:

      【解决方案3】:

      对于那些试图读取远程文件的人来说,这是一个小改动,这有助于更快地读取它(远程文件的直接 read_parquet 对我来说要慢得多):

      import io
      merged = []
      # remote_reader = ... <- init some remote reader, for example AzureDLFileSystem()
      for f in files:
          with remote_reader.open(f, 'rb') as f_reader:
              merged.append(remote_reader.read())
      merged = pd.concat((pd.read_parquet(io.BytesIO(file_bytes)) for file_bytes in merged))
      

      虽然会增加一点临时内存开销。

      【讨论】:

        【解决方案4】:

        这帮助我将所有 parquet 文件加载到一个数据帧中

        import glob
         files = glob.glob("*.snappy.parquet")
         data = [pd.read_parquet(f,engine='fastparquet') for f in files]
         merged_data = pd.concat(data,ignore_index=True)
        

        【讨论】:

          【解决方案5】:

          我遇到了这个问题,想看看 pandas 是否可以本地读取分区 parquet 数据集。我不得不说当前的答案是不必要的冗长(使其难以解析)。我还认为不断打开/关闭文件句柄然后根据大小扫描到它们的末尾并不是特别有效。

          更好的选择是将所有 parquet 文件读入单个 DataFrame,然后写入一次:

          from pathlib import Path
          import pandas as pd
          
          data_dir = Path('dir/to/parquet/files')
          full_df = pd.concat(
              pd.read_parquet(parquet_file)
              for parquet_file in data_dir.glob('*.parquet')
          )
          full_df.to_csv('csv_file.csv')
          

          或者,如果您真的只想追加到文件中:

          data_dir = Path('dir/to/parquet/files')
          for i, parquet_path in enumerate(data_dir.glob('*.parquet')):
              df = pd.read_parquet(parquet_path)
              write_header = i == 0 # write header only on the 0th file
              write_mode = 'w' if i == 0 else 'a' # 'write' mode for 0th file, 'append' otherwise
              df.to_csv('csv_file.csv', mode=write_mode, header=write_header)
          

          附加每个文件的最后替代方案,在开始时以"a+" 模式打开目标 CSV 文件,每次写入/附加时将文件句柄扫描到文件末尾(我相信这可行,但没有t 实际上测试过):

          data_dir = Path('dir/to/parquet/files')
          with open('csv_file.csv', "a+") as csv_handle:
              for i, parquet_path in enumerate(data_dir.glob('*.parquet')):
                  df = pd.read_parquet(parquet_path)
                  write_header = i == 0 # write header only on the 0th file
                  df.to_csv(csv_handle, header=write_header)
          

          【讨论】:

          • 这种 readall+concat 方法将受到内存(8G,16GB)的限制,无论它是什么。 open+append 不会。
          【解决方案6】:

          如果您要将文件复制到本地计算机并运行您的代码,您可以执行以下操作。下面的代码假定您在与 parquet 文件相同的目录中运行代码。它还假定您在上面提供的文件命名:“顺序。例如:par_file1、par_file2、par_file3 等等,一个文件夹中最多 100 个文件。”如果您需要搜索文件,则需要使用 glob 获取文件名并明确提供要保存 csv 的路径:open(r'this\is\your\path\to\csv_file.csv', 'a') 希望这会有所帮助。

          import pandas as pd
          
          # Create an empty csv file and write the first parquet file with headers
          with open('csv_file.csv','w') as csv_file:
              print('Reading par_file1.parquet')
              df = pd.read_parquet('par_file1.parquet')
              df.to_csv(csv_file, index=False)
              print('par_file1.parquet appended to csv_file.csv\n')
              csv_file.close()
          
          # create your file names and append to an empty list to look for in the current directory
          files = []
          for i in range(2,101):
              files.append(f'par_file{i}.parquet')
          
          # open files and append to csv_file.csv
          for f in files:
              print(f'Reading {f}')
              df = pd.read_parquet(f)
              with open('csv_file.csv','a') as file:
                  df.to_csv(file, header=False, index=False)
                  print(f'{f} appended to csv_file.csv\n')
          

          您可以根据需要删除打印语句。

          python 3.6 中使用pandas 0.23.3 测试

          【讨论】:

          • 非常感谢,这正是我想要的。我通过将绝对路径替换为 parquet 和 csv 文件位置来运行此代码 n 空闲。我这里还有一个问题。如果我没有将文件复制到本地并且想直接从 hdfs 文件夹位置读取镶木地板,然后将内容附加到本地的 csv 文件中,那么有没有办法在此代码中提供 hdfs 路径?
          • @Pri31 这个文档应该有帮助:crs4.github.io/pydoop/api_docs/hdfs_api.html
          猜你喜欢
          • 2017-07-26
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2018-04-19
          • 2018-08-13
          • 2019-06-02
          • 1970-01-01
          • 2016-09-12
          相关资源
          最近更新 更多