【问题标题】:How do I read a large file from disk to database without running out of memory如何在不耗尽内存的情况下从磁盘读取大文件到数据库
【发布时间】:2012-01-24 14:07:51
【问题描述】:

我觉得问这个问题很尴尬,因为我觉得我应该已经知道了。但是,鉴于我没有......我想知道如何将大文件从磁盘读取到数据库而不会出现 OutOfMemory 异常。具体来说,我需要加载 CSV(或真正的制表符分隔文件)。

我正在尝试CSVReader,特别是this code sample,但我确定我做错了。他们的一些other coding samples 展示了如何读取 any 大小的流文件,这几乎是我想要的(只需要从磁盘读取),但我不知道是什么类型的IDataReader 我可以创建以允许这样做。

我正在直接从磁盘读取,并且我试图通过一次读取太多数据来确保我不会耗尽内存。我不禁想到我应该能够使用BufferedFileReader 或类似的东西,我可以指向文件的位置并指定缓冲区大小,然后CsvDataReader 期望IDataReader 作为它的第一个参数,它可以使用它。请告诉我我的方法的错误,让我摆脱我的GetData 方法及其任意文件分块机制,并帮助我解决这个基本问题。

    private void button3_Click(object sender, EventArgs e)
    {   
        totalNumberOfLinesInFile = GetNumberOfRecordsInFile();
        totalNumberOfLinesProcessed = 0; 

        while (totalNumberOfLinesProcessed < totalNumberOfLinesInFile)
        {
            TextReader tr = GetData();
            using (CsvDataReader csvData = new CsvDataReader(tr, '\t'))
            {
                csvData.Settings.HasHeaders = false;
                csvData.Settings.SkipEmptyRecords = true;
                csvData.Settings.TrimWhitespace = true;

                for (int i = 0; i < 30; i++) // known number of columns for testing purposes
                {
                    csvData.Columns.Add("varchar");
                }

                using (SqlBulkCopy bulkCopy = new SqlBulkCopy(@"Data Source=XPDEVVM\XPDEV;Initial Catalog=MyTest;Integrated Security=SSPI;"))
                {
                    bulkCopy.DestinationTableName = "work.test";

                    for (int i = 0; i < 30; i++)
                    {
                        bulkCopy.ColumnMappings.Add(i, i); // map First to first_name
                    }

                    bulkCopy.WriteToServer(csvData);

                }
            }
        }
    }

    private TextReader GetData()
    {
        StringBuilder result = new StringBuilder();
        int totalDataLines = 0;
        using (FileStream fs = new FileStream(pathToFile, FileMode.Open, System.IO.FileAccess.Read, FileShare.ReadWrite))
        {
            using (StreamReader sr = new StreamReader(fs))
            {
                string line = string.Empty;
                while ((line = sr.ReadLine()) != null)
                {
                    if (line.StartsWith("D\t"))
                    {
                        totalDataLines++;
                        if (totalDataLines < 100000) // Arbitrary method of restricting how much data is read at once.
                        {
                            result.AppendLine(line);
                        }
                    }
                }
            }
        }
        totalNumberOfLinesProcessed += totalDataLines;
        return new StringReader(result.ToString());
    }

【问题讨论】:

  • 这就是虚拟内存的用途。真正的问题是地址空间。
  • 您是否尝试过使用 FileHelpers 类来解析 CSV 数据? filehelpers.com
  • @Kane,不,我不知道。不过看起来还可以。我的文件一开始就有一些元数据,所以它需要迎合这一点。我会看看一些网站,看看它是否有用。谢谢。

标签: sql-server-2005 csv out-of-memory sqlbulkcopy file-processing


【解决方案1】:

可能不是您正在寻找的答案,但这就是 BULK INSERT 的设计目的。

【讨论】:

  • 是的,我同意。我遇到的问题是,并非文件中的每一行都需要处理。该文件在顶部包含许多元数据标题,并且只处理某些行(以 D\t 开头)。我不确定 BulkInsert 能否帮助我解决这个问题。
  • @MrMoose - 您最好编写一个预处理器来循环浏览您的 CSV 文件并为插入做好准备。
  • 你对预处理器有什么建议?将数据行完全提取到另一个文件,然后使用 SqlBulkCopy/BulkInsert 加载该文件?或者您是否建议预处理器在内存中准备数据。这就是我在上面试图实现的目标,但它似乎不是一种将文件分解成可管理大小的块的不太理想的方法,因此可以处理每个块而不会有内存不足异常的风险。
  • @MrMoose - 在根据欲望或偏好施加人为约束之前,尝试根据时间、环境和性能等外部约束来制定解决方案。如果您正在寻找最小的内存占用,然后创建一个流读取器和一个流写入器,一次从文件中读取一行,过滤它,将其写回新文件,然后使用本机批量上传技术在新处理的文件上将产生最小的内存占用。如果您必须在内存中完成所有操作,那么我会说跳过执行批量插入并一次执行一行。
【解决方案2】:

实际上,您的代码正在从文件中读取所有数据并保存到TextReader(在内存中)。然后你从TextReader读取数据到保存服务器。

如果数据太大,TextReader 中的数据大小会导致内存不足。请尝试这种方式。

1) 从文件中读取数据(每一行)。

2) 然后将每一行插入到Server。

内存不足的问题将得到解决,因为只有在处理内存中的每条记录。

伪代码

begin tran

While (data = FilerReader.ReadLine())
{
  insert into Table[col0,col1,etc] values (data[0], data[1], etc)
}

end tran

【讨论】:

  • 虽然我很欣赏这个答案,但这并不是我想要的。我的示例中的代码是一个修改。我的原始代码将整个文件读到最后(导致 OOM 异常)。我的问题的目的是避免 Row By Agonizing Row (RBAR) 插入,并利用 SQLBulkCopy 的优势快速加载数据。您的解决方案肯定会解决我的 OOM 问题,但它会引入令我不满意的性能问题用户:)
  • 你是对的,它会做一个记录一个事务并在插入时阻塞。在这种情况下,我在每个事务中使用了一批记录。意思是..我在 SQL 数据库中使用 XML 格式在每个事务中插入 100 条记录(例如)。
【解决方案3】:

我只想添加使用 BufferedFileReader 和 readLine 方法,并以上述方式执行。

基本了解这里的职责。

BufferedFileReader 是从文件中读取数据的类(buffe wise) 也应该有一个 LineReader。 CSVReader 是一个实用类,用于读取格式正确的数据。

SQlBulkCopy 你正在使用。

第二个选项

您可以直接进入数据库的导入工具。如果文件格式正确,程序的重点就是这个。那也会更快。

【讨论】:

    【解决方案4】:

    我认为您可能对数据的大小有疑问。每次我遇到这个问题,都不是数据的大小,而是循环数据时创建的对象的数量。

    查看您的 while 循环,在 button3_Click(object sender, EventArgs e) 方法中向数据库添加记录:

    TextReader tr = GetData();
    using (CsvDataReader csvData = new CsvDataReader(tr, '\t'))
    

    在这里,您每次迭代都声明并实例化两个对象——这意味着对于您读取的每个文件块,您将实例化 200,000 个对象;垃圾收集器跟不上。

    为什么不在 while 循环之外声明对象?

    TextReader tr = null;
    CsvDataReader csvData = null;
    

    这样,gc 将有一半的机会。您可以通过对 while 循环进行基准测试来证明差异,毫无疑问,在您创建了几千个对象后,您会注意到性能大幅下降。

    【讨论】:

    • 你在哪里看到我正在创建 200,000 个对象。即使是这样,我也很有信心垃圾收集器可以跟上。我最初的问题是由于将一个完整的文件读入内存,然后还试图将该整个文件绑定到一个数据网格。我现在正在远离它,因为它显然不适用于较大的文件,但我只是好奇地找到一种将大文件加载到数据库中的机制(不必求助于逐行处理)并且没有我遇到了内存问题。
    【解决方案5】:

    伪代码:

    while (!EOF) {
       while (chosenRecords.size() < WRITE_BUFFER_LIST_SIZE) {
          MyRecord record = chooseOrSkipRecord(file.readln());
          if (record != null) {
             chosenRecords.add(record)
          }
       }  
       insertRecords(chosenRecords) // <== writes data and clears the list
    }
    

    WRITE_BUFFER_LIST_SIZE 只是一个您设置的常数...更大意味着更大的批次,更小意味着更小的批次。大小为 1 是 RBAR :)。

    如果您的操作足够大以至于中途失败是一种现实的可能性,或者如果中途失败可能会让某人付出不小的代价,那么您可能还想将处理的记录总数写入第二个表到目前为止从文件(包括您跳过的文件)作为同一事务的一部分,以便您可以在部分完成时从中断的地方继续。

    【讨论】:

    • 这基本上就是我正在做的事情。虽然它有效,但我觉得一定还有更好的方法。我的示例中的任意大小似乎让我觉得它是一个不合标准的解决方案。对我来说,我会以我想一次阅读的最大尺寸为基础。因此,如果我正在处理的文件是 100M 并且我只想一次读取 10M 并且文件中有 1M 行,那么我将执行一些基本计算(即 100M 中的 1M 行,因此大约 100k 行在 10M 中)并以 100k 行的块读取文件。它会起作用,但正如我所说..它看起来很乱,我觉得必须有更好的方法。
    【解决方案6】:

    我建议读取一个块并将其插入数据库,而不是一一读取 csv 行并一一插入到数据库中。重复此过程,直到读取整个文件。

    您可以在内存中缓冲,例如一次 1000 行 csv,然后将它们插入数据库中。

    int MAX_BUFFERED=1000;
    int counter=0;
    List<List<String>> bufferedRows= new ...
    
    while (scanner.hasNext()){
      List<String> rowEntries= getData(scanner.getLine())
      bufferedRows.add(rowEntries);
    
      if (counter==MAX_BUFFERED){
        //INSERT INTO DATABASE
        //append all contents to a string buffer and create your SQL INSERT statement
        bufferedRows.clearAll();//remove data so it could be GCed when GC kicks in
      }
    }
    

    【讨论】:

    • 感谢阿德里安,请参阅下面我对@Gus 的评论。这基本上就是我正在做的事情(使用 SqlBulkCopy 而不是大型 INSERT 语句),但我只是想知道是否有更好的方法。如何确定要读取的最佳行数?取决于很多因素,例如行大小、可用内存大小等。
    • 您需要在您阅读的机器上进行测试。看看它在读取半满内存、几乎满内存、满内存、双倍内存时的表现。
    • 嗯,它将部署在客户端服务器类型的场景中,并将在不同的机器上运行,每台机器都有不同的配置。我不太可能找到适合所有人的方案。我必须选择我认为最佳的东西,然后从 UI 中进行配置。
    • 大概读取 100 行并插入。 Bur 您可以获得空闲内存,并据此估计基于某些基准测试要缓冲的行数。
    猜你喜欢
    • 2013-07-30
    • 2023-03-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-10-28
    • 2013-08-09
    相关资源
    最近更新 更多