【问题标题】:How to improve performance of CSV upload via datatable如何通过数据表提高 CSV 上传的性能
【发布时间】:2023-03-22 08:59:01
【问题描述】:

我有一个上传 CSV 文件的有效解决方案。目前,我使用IFormCollection 让用户从一个视图上传多个 CSV 文件。

CSV 文件保存为临时文件,如下所示:

List<string> fileLocations = new List<string>();
foreach (var formFile in files)
{
   filePath = Path.GetTempFileName();    
   if (formFile.Length > 0)
   {
       using (var stream = new FileStream(filePath, FileMode.Create))
       {
           await formFile.CopyToAsync(stream);
       }
   }

   fileLocations.Add(filePath);
}

我将文件位置列表发送到另一种方法(就在下面)。我遍历文件位置并从临时文件中流式传输数据,然后使用数据表和SqlBulkCopy 插入数据。我目前一次上传 50 到 200 个文件,每个文件大约 330KB。插入一百个,大约需要 6 分钟,也就是 30-35MB 左右。

public void SplitCsvData(string fileLocation, Guid uid)
        {
            MetaDataModel MetaDatas;
            List<RawDataModel> RawDatas;

            var reader = new StreamReader(File.OpenRead(fileLocation));
            List<string> listRows = new List<string>();
            while (!reader.EndOfStream)
            {
                listRows.Add(reader.ReadLine());
            }

            var metaData = new List<string>();
            var rawData = new List<string>();

            foreach (var row in listRows)
            {
                var rowName = row.Split(',')[0];
                bool parsed = int.TryParse(rowName, out int result);

                if (parsed == false)
                {
                    metaData.Add(row);
                }
                else
                {
                    rawData.Add(row);
                }
            }

         //Assigns the vertical header name and value to the object by splitting string 
         RawDatas = GetRawData.SplitRawData(rawData);
         SaveRawData(RawDatas);

         MetaDatas = GetMetaData.SplitRawData(rawData);
         SaveRawData(RawDatas);

        }

然后此代码将对象传递给 以创建数据表并插入数据。

private DataTable CreateRawDataTable
{
   get
   {
       var dt = new DataTable();
       dt.Columns.Add("Id", typeof(int));
       dt.Columns.Add("SerialNumber", typeof(string));
       dt.Columns.Add("ReadingNumber", typeof(int));
       dt.Columns.Add("ReadingDate", typeof(string));
       dt.Columns.Add("ReadingTime", typeof(string));
       dt.Columns.Add("RunTime", typeof(string));
       dt.Columns.Add("Temperature", typeof(double));
       dt.Columns.Add("ProjectGuid", typeof(Guid));
       dt.Columns.Add("CombineDateTime", typeof(string));

        return dt;
  }
}

public void SaveRawData(List<RawDataModel> data)
{
       DataTable dt = CreateRawDataTable;
       var count = data.Count;          

       for (var i = 1; i < count; i++)
       {
           DataRow row = dt.NewRow();
           row["Id"] = data[i].Id;
           row["ProjectGuid"] = data[i].ProjectGuid;
           row["SerialNumber"] = data[i].SerialNumber;
           row["ReadingNumber"] = data[i].ReadingNumber;
           row["ReadingDate"] = data[i].ReadingDate;
           row["ReadingTime"] = data[i].ReadingTime;
           row["CombineDateTime"] = data[i].CombineDateTime;
           row["RunTime"] = data[i].RunTime;
           row["Temperature"] = data[i].Temperature;
           dt.Rows.Add(row);
        }

        using (var conn = new SqlConnection(connectionString))
        {
           conn.Open();
           using (SqlTransaction tr = conn.BeginTransaction())
           {
               using (var sqlBulk = new SqlBulkCopy(conn, SqlBulkCopyOptions.Default, tr))
               {
                   sqlBulk.BatchSize = 1000;
                   sqlBulk.DestinationTableName = "RawData";
                   sqlBulk.WriteToServer(dt);
               }
               tr.Commit();
           }
       }
   }

是否有其他方法或更好的方法来提高性能,从而减少上传时间,因为上传时间可能很长,而且我发现内存使用量不断增加至 500MB 左右。

TIA

【问题讨论】:

  • 通过摆脱数据表。现在,您正在将整个表加载到内存中,然后对其进行解析并在内存中制作 another 副本,并且仅在最后将表写入数据库。 WriteToServer 也可以接受 DbDataReader。如果您找到一种在文件顶部创建数据读取器的方法,您将能够将文件中的记录直接泵送到 SqlBulkCopy
  • 不要填写List&lt;string&gt;,而是直接填写DataTable。如果您在SaveRawData 中有一个BatchSize,则将其设为一个字段并在while 循环中检查DataTable.Rows.Count==MaxBatchSize。然后你可以把它传递给SaveRawData。通过它后,创建一个新的空表。这样,您的内存中就只有您当前正在处理的内容

标签: c# csv datatable asp.net-core-2.0 sqlbulkcopy


【解决方案1】:

您可以通过删除 DataTable 并直接从输入流中读取来提高性能。

SqlBulkCopy 有一个WriteToServer 重载,它接受 IDataReader 而不是整个 DataTable。

CsvHelper 可以使用 StreamReader 作为输入来 CSV 文件。它提供 CsvDataReader 作为 CSV 数据之上的 IDataReader 实现。这允许直接从输入流中读取并写入 SqlBulkCopy。

以下方法将从IFormFile 中读取,使用 CsvHelper 解析流并使用 CSV 的字段来配置 SqlBulkCopy 实例:

public async Task ToTable(IFormFile file, string table)
{
    using (var stream = file.OpenReadStream())
    using (var tx = new StreamReader(stream))
    using (var reader = new CsvReader(tx))
    using (var rd = new CsvDataReader(reader))
    {
        var headers = reader.Context.HeaderRecord;

        var bcp = new SqlBulkCopy(_connection)
        {
            DestinationTableName = table
        };
        //Assume the file headers and table fields have the same names
        foreach(var header in headers)
        {
            bcp.ColumnMappings.Add(header, header);
        }

        await bcp.WriteToServerAsync(rd);                
    }
}

这种方式不会将任何内容写入临时表或缓存在内存中。上传的文件直接解析并写入数据库。

【讨论】:

  • 我已经更新了我的部分问题,部分原因(我应该在一开始就说)是我必须将 CSV 文件分成两部分,第一组行是元数据和其余的行是原始数据,它们由以数字开头的原始数据行定义。标题是垂直的而不是水平的。这是否意味着我必须将 IFormFile 保存为两个部分才能使用您的解决方案。
  • @TheOrangeGoblin 这不是 CSV 文件。那是完全不同的东西,您应该在问题中解释这一点,以及文件样本。您仍然不需要 DataTable 和多个内存副本。您可以使解析器成为迭代器方法并使用 Fast-Member 库中的 ObjectReader 使其看起来像 IDataReader
  • 我尝试安装 Fast-Member 但由于某种原因它无法在 netcore2.1 上运行,依赖项似乎不正确。我也尝试了其他方法,无论是本地还是远程都需要同样长的时间。回到绘图板。
  • @TheOrangeGoblin 它在 .NET Core 2.1 上运行良好,这就是我现在使用的。 it takes just as long 那么您可能会遇到与您的代码完全无关的问题。复制 3 次总是比不创建任何副本要慢。您是否尝试过使用bcpBULK INSERT?它有多快/多慢?如果bcp 很慢,SqlBulkCopy 也会很慢
  • 谢谢,由于您的意见,我设法让它工作。我已经发布了这个问题的答案...
【解决方案2】:

除了@Panagiotis 的回答,为什么不将文件处理与文件上传交错?将您的文件处理逻辑包装在异步方法中并将循环更改为 Parallel.Foreach 并在每个文件到达时对其进行处理,而不是等待所有文件?

private static readonly object listLock = new Object(); // only once at class level


    List<string> fileLocations = new List<string>();
    Parallel.ForEach(files, (formFile) => 
    {
       filePath = Path.GetTempFileName();    
       if (formFile.Length > 0)
       {
           using (var stream = new FileStream(filePath, FileMode.Create))
           {
               await formFile.CopyToAsync(stream);
           }

           await ProcessFileInToDbAsync(filePath); 
       }

       // Added lock for thread safety of the List 
       lock (listLock)
       {
           fileLocations.Add(filePath);
       }     
    });

【讨论】:

  • 这不会交错处理。它异步执行每个操作,但它们仍然一次完成一个
  • 将其调整为 Parallel.ForEach
【解决方案3】:

感谢@Panagiotis Kanavos,我能够弄清楚该怎么做。首先,我调用方法的方式是将它们留在内存中。我拥有的 CSV 文件分为两部分,垂直元数据和通常的水平信息。所以我需要把它们分成两部分。将它们保存为 tmp 文件也会导致开销。它从需要 5-6 分钟到现在需要 1 分钟,我想这对于包含 8,500 行的 100 个文件来说还不错。

调用方法:

public async Task<IActionResult> UploadCsvFiles(ICollection<IFormFile> files, IFormCollection fc)
{
   foreach (var f in files)
   {
       var getData = new GetData(_configuration);
       await getData.SplitCsvData(f, uid);
   }

   return whatever;
}

这是进行拆分的方法:

public async Task SplitCsvData(IFormFile file, string uid)
    {
        var data = string.Empty;
        var m = new List<string>();
        var r = new List<string>();

        var records = new List<string>();
        using (var stream = file.OpenReadStream())
        using (var reader = new StreamReader(stream))
        {
            while (!reader.EndOfStream)
            {
                var line = reader.ReadLine();
                var header = line.Split(',')[0].ToString();
                bool parsed = int.TryParse(header, out int result);
                if (!parsed)
                {
                    m.Add(line);
                }
                else
                {
                    r.Add(line);
                }
            }
        }

        //TODO: Validation
        //This splits the list into the Meta data model. This is just a single object, with static fields.
        var metaData = SplitCsvMetaData.SplitMetaData(m, uid);
        DataTable dtm = CreateMetaData(metaData);
        var serialNumber = metaData.LoggerId;
        await SaveMetaData("MetaData", dtm);

        //
        var lrd = new List<RawDataModel>();
        foreach (string row in r)
        {
            lrd.Add(new RawDataModel
            {
                Id = 0,
                SerialNumber = serialNumber,
                ReadingNumber = Convert.ToInt32(row.Split(',')[0]),
                ReadingDate = Convert.ToDateTime(row.Split(',')[1]).ToString("yyyy-MM-dd"),
                ReadingTime = Convert.ToDateTime(row.Split(',')[2]).ToString("HH:mm:ss"),
                RunTime = row.Split(',')[3].ToString(),
                Temperature = Convert.ToDouble(row.Split(',')[4]),
                ProjectGuid = uid.ToString(),
                CombineDateTime = Convert.ToDateTime(row.Split(',')[1] + " " + row.Split(',')[2]).ToString("yyyy-MM-dd HH:mm:ss")
            });
        }

        await SaveRawData("RawData", lrd);
    }

然后,我将数据表用于元数据(100 个文件需要 20 秒),因为我将字段名称映射到列。

 public async Task SaveMetaData(string table, DataTable dt)
    {
        using (SqlBulkCopy sqlBulk = new SqlBulkCopy(_configuration.GetConnectionString("DefaultConnection"), SqlBulkCopyOptions.Default))
        { 
            sqlBulk.DestinationTableName = table;
            await sqlBulk.WriteToServerAsync(dt);
        }
    }

然后我将 FastMember 用于原始数据的大数据部分,这更像是传统的 CSV。

 public async Task SaveRawData(string table, IEnumerable<LogTagRawDataModel> lrd)
    {
        using (SqlBulkCopy sqlBulk = new SqlBulkCopy(_configuration.GetConnectionString("DefaultConnection"), SqlBulkCopyOptions.Default))
        using (var reader = ObjectReader.Create(lrd, "Id","SerialNumber", "ReadingNumber", "ReadingDate", "ReadingTime", "RunTime", "Temperature", "ProjectGuid", "CombineDateTime"))
        {                
            sqlBulk.DestinationTableName = table;
            await sqlBulk.WriteToServerAsync(reader);
        }  
    }

我相信这可以改进,但现在,这真的很好。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2016-10-24
    • 2017-05-23
    • 2021-01-13
    • 2017-01-28
    • 2019-11-12
    • 2014-11-21
    • 1970-01-01
    • 2019-05-03
    相关资源
    最近更新 更多