【问题标题】:CsvHelper - Populating datatable slow(ish)CsvHelper - 缓慢填充数据表(ish)
【发布时间】:2021-06-18 01:02:44
【问题描述】:

已将以下代码放在一起以读取一组特定的 CSV 文件。它有效,但在很大程度上是一项正在进行的工作。有一段代码(填充数据表行 - 请参见下面的片段)与 SqlBulkCopy 操作的运行时间一样长。就如何提高性能征求意见/建议。

在代码(如下)中,以 50K 批次处理约 15M 行文件只用了不到 11.5 分钟。分解部分。 SqlBulkCopy 耗时约 236Kms(4 分钟),阅读器仅需 105Kms(约 1.5 分钟),而填充数据表的部分耗时约 200Kms(3.33 分钟)。

     csvTableTimer.Start();
     // Process row and populate datatable
     DataRow dr = dt.NewRow();
 
          foreach (DataColumn dc in dt.Columns)
          {
               if (row.GetType().GetProperty(dc.ToString()).GetValue(row) != null)
               {
                    dr[dc.ToString()] = row.GetType().GetProperty(dc.ToString()).GetValue(row);
               }
           }
           dt.Rows.Add(dr);
 
      csvTableTimer.Stop();

CSV 文件非常大(10+GB)并且没有标题。我正在使用 Class 来构建数据表结构,并且喜欢在填充数据表行时继续使用这种方法,因为我需要扩展它以使用多种 CSV 类型。

数据表反映了与 SQL DB 表对齐的类中的列名。曾想使用 GetField(已转换,不是原始)遍历数据表 row[column.ColumnName] = csv.GetField( column.DataType, column.ColumnName ); 中的每一列,但一直收到关于没有标题的错误。发现了一个与 HasHeaderRecord = false 相关的未解决问题,这与我试图做的事情相匹配,这增加了我向那些更熟练的人寻求建议的愿望。感谢您的帮助!

在代码块上展开;

     var rconfig = new CsvHelper.Configuration.CsvConfiguration(CultureInfo.InvariantCulture)
     {
         BufferSize = 1024,
         Delimiter = ",",
         AllowComments = true,
         HasHeaderRecord = false,
         HeaderValidated = null,
         IgnoreBlankLines = true,
         MissingFieldFound = null,
         Comment = '#',
         Escape = '"',
         TrimOptions = TrimOptions.Trim,
         BadDataFound = x =>
         {
             isBadRecord = true;
             ErrRecords.Add(x.RawRecord);
             ++badCount;
         }
     };

     var loadFType = @"B";
     // Create datatable using class as definition.
     PropertyDescriptorCollection props1 = TypeDescriptor.GetProperties(loaderFileType);
     DataTable dt = new DataTable();
     dt = UtilExtensions.CreateDataTable(props1);

     using (var reader = new StreamReader(rFile))
     {
         reader.ReadLine();
          
         using (var csv = new CsvReader(reader, rconfig))
         {
             switch (loadFType)
             {
                 case "ALL":
                     csv.Context.RegisterClassMap<CSVLoader.AMap>();
                     var allRecords = new List<CSVLoader.A>();
                     break;
                 case "BAL":
                     csv.Context.RegisterClassMap<CSVLoader.BMap>();
                     var balRecords = new List<CSVLoader.B>();
                     break;

                 case "CIF":
                     csv.Context.RegisterClassMap<CSVLoader.CMap>();
                     var cifRecords = new List<CSVLoader.C>();
                     break;
             }

             dt.BeginLoadData();
             while (csv.Read())
             {
                 csvReadTimer.Start();
                 var row = csv.GetRecord(loaderFileType);
                 csvReadTimer.Stop();

                 runningCount++;

                 if (!isBadRecord)
                 {
                      csvTableTimer.Start();
                      // Process row and populate datatable
                      DataRow dr = dt.NewRow();

                      foreach (DataColumn dc in dt.Columns)
                      {
                          if (row.GetType().GetProperty(dc.ToString()).GetValue(row) != null)
                          {
                              dr[dc.ToString()] = row.GetType().GetProperty(dc.ToString()).GetValue(row);
                          }
                      }
                     dt.Rows.Add(dr);

                     csvTableTimer.Stop();
                     ++goodCount;

                     if (batchCount >= dtbatchSize || runningCount >= fileRecCount)
                     {
                         try
                         {
                             // Write from the source to the destination.
                             bcpLoadTimer.Start();

                             bulkCopy.WriteToServer(dt);

                             bcpLoadTimer.Stop();
                             bcpLoadBatchCount++;

                         }
                         catch (Exception ex)
                         {
                         }
                         dt.Clear();
                         batchCount = 0;
                     }
                     batchCount++;
                 }
                 isBadRecord = false;
             }
             dt.EndLoadData();
             reader.Close();
             dt.Clear();
             transaction.Commit();
// B
public class B
{
    [Index(0)]
    public string A { get; set; }
    [Index(1)]
    public string BString { get; set; }
    [Index(2)]
    public int? C { get; set; }
    [Index(3)]
    public string D { get; set; }
    [Index(4)]
    public string E { get; set; }
    [Index(5)]
    public DateTime? F { get; set; }
    [Index(6)]
    public decimal? G { get; set; }
    [Index(7)]
    public decimal? H { get; set; }
    [Index(8)]
    public decimal? I { get; set; }
    [Index(9)]
    public decimal? J { get; set; }
    [Index(10)]
    public int? K { get; set; }
    [Index(11)]
    public string L { get; set; }
    [Index(12)]
    public DateTime? M { get; set; }
}

// B
public sealed class BMap : ClassMap<B>
{
    public BMap()
    {
        // AutoMap(CultureInfo.InvariantCulture);
        Map(m => m.A).Index(0);
        Map(m => m.BString).Index(1); 
        Map(m => m.C).Index(2);
        Map(m => m.D).Index(3);
        Map(m => m.E).Index(4);
        Map(m => m.F).Index(5).TypeConverterOption.Format("yyyyMMdd");
        Map(m => m.G).Index(6);
        Map(m => m.H).Index(7);
        Map(m => m.I).Index(8);
        Map(m => m.J).Index(9);
        Map(m => m.K).Index(10);
        Map(m => m.L).Index(11);
        Map(m => m.M).Index(12).TypeConverterOption.Format("yyyy-MM-dd-hh.mm.ss.ffffff");
    }
}

【问题讨论】:

标签: c# performance datatable csvhelper


【解决方案1】:

您的问题实际上并不包含minimal reproducible example,因此我简化了您的代码以创建以下FileLoader 类,该类计算从某个类TClass 的实例填充DataTable 所需的时间(这里B) 已使用 CsvReader 从 CSV 行中读取:

public class FileLoader
{
    public System.Diagnostics.Stopwatch csvTableTimer { get; } = new();

    public long Load<TClass, TClassMap>(string rFile, int dtbatchSize) where TClassMap : ClassMap<TClass>, new()
    {
        bool isBadRecord = false;
        long badCount = 0;
        long runningCount = 0;
        long goodCount = 0;
        long batchCount = 0;

        var rconfig = CreateCsvConfiguration(
            x => 
            {
                isBadRecord = true;
                //ErrRecords.Add(x.RawRecord);
                ++badCount;
            });
        
        // Create datatable using class as definition.
        var dt = UtilExtensions.CreateDataTable(typeof(TClass));

        using (var reader = new StreamReader(rFile))
        {
            //reader.ReadLine();  FIXED - THIS SKIPPED THE FIRST LINE AND CAUSED A RECORD TO BE OMITTED.
            using (var csv = new CsvReader(reader, rconfig))
            {
                csv.Context.RegisterClassMap<TClassMap>();

                dt.BeginLoadData();
                while (csv.Read())
                {
                    isBadRecord = false;
                    //csvReadTimer.Start();
                    var record = csv.GetRecord<TClass>();
                    //csvReadTimer.Stop();

                    runningCount++;
                    if (!isBadRecord)
                    {
                        csvTableTimer.Start();
                        // Process row and populate datatable
                        DataRow dr = dt.NewRow();
                        foreach (DataColumn dc in dt.Columns)
                        {
                            if (record.GetType().GetProperty(dc.ToString()).GetValue(record) != null)
                            {
                                dr[dc.ToString()] = record.GetType().GetProperty(dc.ToString()).GetValue(record);
                            }
                        }
                        dt.Rows.Add(dr);
                        csvTableTimer.Stop();
                        goodCount++;
                        if (++batchCount >= dtbatchSize)
                        {
                            // Flush the data table
                            FlushTable(dt);
                            batchCount = 0;
                        }
                    }
                }
                dt.EndLoadData();
                FlushTable(dt);
                Commit();
            }
        }
        
        return goodCount;
    }

    protected virtual void FlushTable(DataTable dt) => dt.Clear();  // Replace with SqlBulkCopy 
    protected virtual void Commit() {} // Replace with transaction.Commit();
    
    public static CsvConfiguration CreateCsvConfiguration(BadDataFound badDataFound) => 
        new CsvHelper.Configuration.CsvConfiguration(CultureInfo.InvariantCulture)
        {
            BufferSize = 1024,
            Delimiter = ",",
            AllowComments = true,
            HasHeaderRecord = false,
            HeaderValidated = null,
            IgnoreBlankLines = true,
            MissingFieldFound = null,
            Comment = '#',
            Escape = '"',
            TrimOptions = TrimOptions.Trim,
            BadDataFound = badDataFound,
        };
}

public static partial class UtilExtensions
{
    static IEnumerable<PropertyInfo> GetSerializableProperties(this Type type) => 
        type.GetProperties().Where(p => p.GetIndexParameters().Length == 0 && p.CanRead && p.CanWrite && p.GetGetMethod() != null && p.GetSetMethod() != null);
    
    public static DataTable CreateDataTable(Type type)
    {
        var dt = new DataTable();
        foreach (var p in type.GetSerializableProperties())
            dt.Columns.Add(p.Name, Nullable.GetUnderlyingType(p.PropertyType) ?? p.PropertyType);
        return dt;
    }
}

然后,如果我使用文件加载器并调用 loader.Load&lt;B, BMap&gt;(rFile, 1000) 来读取 5555 行的 CSV 文件 20 次,则在 dotnetfiddle 上大约需要 1049 毫秒。请参阅演示 #1 here

您遇到的一个问题是 c# 中的反射可能非常慢。你调用record.GetType().GetProperty(dc.ToString()).GetValue(record) 两次,如果我简单地将调用次数减少1,时间会减少到706 ms 左右:

                        foreach (DataColumn dc in dt.Columns)
                        {
                            var value = record.GetType().GetProperty(dc.ToString()).GetValue(record);
                            if (value != null)
                            {
                                dr[dc.ToString()] = value;
                            }
                        }

演示 #2 here.

但是,我们可以通过在运行时制造委托来做得更好。首先,添加以下利用 System.Linq.Expressions 命名空间的实用方法:

public static partial class UtilExtensions
{
    public static Func<TSource, object> CreatePropertyGetter<TSource>(PropertyInfo propertyInfo)
    {
        var parameter = Expression.Parameter(typeof(TSource), "obj");
        var property = Expression.Property(parameter, propertyInfo);
        var convert = Expression.Convert(property, typeof(object));
        var lambda = Expression.Lambda(typeof(Func<TSource, object>), convert, parameter);

        return (Func<TSource, object>)lambda.Compile();
    }

    public static ReadOnlyDictionary<string, Func<TSource, object>> PropertyGetters<TSource>() => PropertyExpressionsCache<TSource>.PropertyGetters;

    static ReadOnlyDictionary<string, Func<TSource, object>> CreatePropertyGetters<TSource>() =>
        typeof(TSource)
            .GetSerializableProperties()
            .ToDictionary(p => p.Name,
                          p => CreatePropertyGetter<TSource>(p))
            .ToReadOnly();

    static class PropertyExpressionsCache<TSource>
    {
        public static ReadOnlyDictionary<string, Func<TSource, object>> PropertyGetters { get; } = UtilExtensions.CreatePropertyGetters<TSource>();
    }
    
    public static ReadOnlyDictionary<TKey, TValue> ToReadOnly<TKey, TValue>(this IDictionary<TKey, TValue> dictionary) => 
        new ReadOnlyDictionary<TKey, TValue>(dictionary ?? throw new ArgumentNullException());
}

并修改Load&lt;TClass, TClassMap&gt;()如下:

public long Load<TClass, TClassMap>(string rFile, int dtbatchSize) where TClassMap : ClassMap<TClass>, new()
{
    bool isBadRecord = false;
    long badCount = 0;
    long runningCount = 0;
    long goodCount = 0;
    long batchCount = 0;

    var rconfig = CreateCsvConfiguration(
        x => 
        {
            isBadRecord = true;
            //ErrRecords.Add(x.RawRecord);
            ++badCount;
        });
    
    var loaderFileType = typeof(TClass);

    // Create datatable using class as definition.
    var dt = UtilExtensions.CreateDataTable(loaderFileType);
    var properties = UtilExtensions.PropertyGetters<TClass>();

    using (var reader = new StreamReader(rFile))
    {
        //reader.ReadLine();  FIXED - THIS SKIPPED THE FIRST LINE AND CAUSED A RECORD TO BE OMITTED.
        using (var csv = new CsvReader(reader, rconfig))
        {
            csv.Context.RegisterClassMap<TClassMap>();

            dt.BeginLoadData();
            while (csv.Read())
            {
                isBadRecord = false;
                //csvReadTimer.Start();
                var record = csv.GetRecord<TClass>();
                //csvReadTimer.Stop();

                runningCount++;
                if (!isBadRecord)
                {
                    csvTableTimer.Start();
                    // Process row and populate datatable
                    DataRow dr = dt.NewRow();
                    foreach (var p in properties)
                    {
                        var value = p.Value(record);
                        if (value != null)
                            dr[p.Key] =  value;
                    }
                    dt.Rows.Add(dr);
                    csvTableTimer.Stop();
                    goodCount++;
                    if (++batchCount >= dtbatchSize)
                    {
                        // Flush the data table
                        FlushTable(dt);
                        batchCount = 0;
                    }
                }
            }
            dt.EndLoadData();
            FlushTable(dt);
        }
    }
    
    return goodCount;
}

时间将进一步减少,大约为 404 毫秒。演示小提琴 #3 here.

我也尝试使用Delegate.CreateDelegate() 而不是Expression

public static partial class UtilExtensions
{
    static Func<TSource, object> CreateTypedPropertyGetter<TSource, TValue>(PropertyInfo propertyInfo)
    {
        var typedFunc = (Func<TSource, TValue>)Delegate.CreateDelegate(typeof(Func<TSource, TValue>), propertyInfo.GetGetMethod());
        return i => (object)typedFunc(i);
    }

    public static Func<TSource, object> CreatePropertyGetter<TSource>(PropertyInfo propertyInfo)
    {
        var typedCreator = typeof(UtilExtensions).GetMethod(nameof(CreateTypedPropertyGetter), BindingFlags.Static | BindingFlags.Public | BindingFlags.NonPublic);
        var concreteTypedCreator = typedCreator = typedCreator.MakeGenericMethod(typeof(TSource), propertyInfo.PropertyType);
        return (Func<TSource, object>)concreteTypedCreator.Invoke(null, new object [] { propertyInfo });
    }

    public static ReadOnlyDictionary<string, Func<TSource, object>> PropertyGetters<TSource>() => PropertyExpressionsCache<TSource>.PropertyGetters;

    static ReadOnlyDictionary<string, Func<TSource, object>> CreatePropertyGetters<TSource>() =>
        typeof(TSource)
            .GetSerializableProperties()
            .ToDictionary(p => p.Name,
                          p => CreatePropertyGetter<TSource>(p))
            .ToReadOnly();

    static class PropertyExpressionsCache<TSource>
    {
        public static ReadOnlyDictionary<string, Func<TSource, object>> PropertyGetters { get; } = UtilExtensions.CreatePropertyGetters<TSource>();
    }
    
    public static ReadOnlyDictionary<TKey, TValue> ToReadOnly<TKey, TValue>(this IDictionary<TKey, TValue> dictionary) => 
        new ReadOnlyDictionary<TKey, TValue>(dictionary ?? throw new ArgumentNullException());
}

得到的时间大致相同,为 410 毫秒。演示小提琴 #4 here.

注意事项:

  • 您问题中的代码通过调用 reader.ReadLine(); 跳过了 CSV 文件的第一行。

    在我的测试工具中,这导致读取的记录数不正确,因此我删除了这一行。

  • 我没有使用具有记录类型开关的非泛型方法,而是提取了一个将记录类型和类映射类型作为泛型参数的泛型方法。这使得委托创建更容易一些,因为不再需要对记录类型进行运行时强制转换。

【讨论】:

  • dbc,从我的代码中可以明显看出,我对 C# 非常熟悉。非常感谢您的时间和详细信息。我会花一点时间从你提供的东西中学习。关于底部的 2 个注释, reader.ReadLine() 将跳过单独进程使用的文件标题(无列标题)。记录类型的开关,这几乎正是我希望学习的。再次TY!
  • @MrAC - 不客气。如果TClass 类型基于文件扩展名,您可以在代码的最顶层打开扩展名,然后使用适当的&lt;TClass, TClassMap&gt; 调用泛型。让我们知道这节省了多少时间来加载您的真实 CSV。如果问题得到解答,请mark it as such
  • 抱歉,暂时无法关闭。有点忙。场景数据表行填充var value = record.GetType().GetProperty(dc.ToString()).GetValue(record); 中的一行更改导致 14% 的改进。 873 万行从 20Krps 变为 24Krps 将始终处于循环中。实施 Linq 解决方案需要我做更多的工作和学习。现在在支持电话之间进行处理。将尽快发布结果(时间)标记案例。再次感谢。
  • 最后一次更新感谢@dbc 的出色输入和详细信息。我最终做出的两个最大的改变是减少反射,然后异步运行批量复制。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2022-11-11
  • 2012-04-30
  • 1970-01-01
  • 1970-01-01
  • 2014-11-05
  • 2017-12-22
  • 2015-03-27
相关资源
最近更新 更多