【问题标题】:How to aggregate millions of rows using EF Core如何使用 EF Core 聚合数百万行
【发布时间】:2019-01-31 04:56:03
【问题描述】:

我正在尝试根据用户聚合大约 200 万行。 一个用户有多个事务,每个事务都有一个平台和一个事务类型。我将平台和事务类型列聚合为 json 并保存为一行。

但是我的代码很慢。 如何提高性能?

  public static void AggregateTransactions()
        {
            using (var db = new ApplicationDbContext())
            {
                db.ChangeTracker.AutoDetectChangesEnabled = false;

                //Get a list of users who have transactions  
                var users = db.Transactions
                   .Select(x => x.User)
                   .Distinct();

                foreach (var user in users.ToList())
                {
                    //Get all transactions for a particular user
                    var _transactions = db.Transactions
                        .Include(x => x.Platform)
                        .Include(x => x.TransactionType)
                        .Where(x => x.User == user)
                        .ToList();

//Aggregate Platforms from all transactions for user
                    Dictionary<string, int> platforms = new Dictionary<string, int>();

                    foreach (var item in _transactions.Select(x => x.Platform).GroupBy(x => x.Name).ToList())
                    {
                        platforms.Add(item.Key, item.Count());
                    };

//Aggregate TransactionTypes from all transactions for user
                   Dictionary<string, int> transactionTypes = new Dictionary<string, int>();

                    foreach (var item in _transactions.Select(x => x.TransactionType).GroupBy(x => x.Name).ToList())
                    {
                        transactionTypes.Add(item.Key, item.Count());
                    };


                    db.Add<TransactionByDay>(new TransactionByDay
                    {
                        User = user,
                        Platforms = platforms,     //The dictionary list is represented as json in table
                        TransactionTypes = transactionTypes     //The dictionary list is represented as json in table
                    });

                    db.SaveChanges();

                }

            }

        }

更新

因此数据的基本视图如下所示:

交易数据:

编号:b11c6b67-6c74-4bbe-f712-08d609af20cf, 用户 ID:1, 平台ID:3, TransactiontypeId: 1

ID:4782803f-2f6b-4d99-f717-08d609af20cf, 用户 ID:1, 平台ID:3, TransactiontypeId: 4

将数据聚合为 TransactionPerDay:

ID:9df41ef2-2fc8-441b-4a2f-08d609e21559, 用户 ID:1, 平台:{“p3”:2}, 交易类型:{"t1":1,"t4":1}

所以在这种情况下,两个事务被聚合为一个。可以看到平台和交易类型会聚合成json。

【问题讨论】:

  • 在您的代码中,您似乎在一次又一次地做同样的事情。除了速度慢之外,其目的也是不可理解的。您能否提供一部分数据作为样本以及您真正想要做的事情。
  • 您的_transactions 查询与user 无关。这是故意的吗?对我来说没有意义。
  • @CetinBasoz 我更新了我的问题
  • @IvanStoev 抱歉,这是一个错字。已更新,谢谢。
  • 我在代码中添加了一些 cmets,希望它能让我的意图更加清晰。

标签: c# sql-server entity-framework .net-core


【解决方案1】:

您可能不应该在循环中调用 db.saveChanges()。将其置于循环之外以将更改持久化一次,可能会有所帮助。

但话虽如此,当处理大量数据和性能是关键时,我发现 ADO.NET 可能是更好的选择。这并不意味着您必须停止使用 Entity Framework,但也许对于这种方法,您可以使用 ADO.NET。如果你走这条路,你可以:

  1. 创建一个存储过程以返回您需要处理的数据、填充数据表、操作数据并使用 sqlBulkCopy 批量保存所有内容。

  2. 使用存储过程来完全执行此操作。这避免了将数据传送到您的应用程序的需要,并且整个处理都可以在数据库本身内进行。

【讨论】:

  • 我用 db.saveChanges() 尝试了几种方法。我在循环外有 db.saveChanges() 和循环内的 if (i % 1000 == 0){db.saveChanges()} ,但没有看到太大的区别。我会仔细看看你提出的方案,谢谢。
  • 请随时向我们发布并分享您的发现。如果我能够提供帮助,请为答案投票。谢谢!
  • 为基本的 ado.net 推荐点赞。
  • 我将使用存储过程。只需要弄清楚如何......我会问一个新问题。谢谢。
【解决方案2】:

Linq To EF 不是为速度而构建的(LinqToSQL 更简单、更快,恕我直言,或者您可以使用 Linq EF\SQL 运行直接 SQL 命令)。无论如何,我不知道这会如何加快速度:

    using (var db = new MyContext(connectionstring))
    {

        var tbd = (from t in db.Transactions
                    group t by t.User
                    into g
                    let platforms = g.GroupBy(tt => tt.Platform.Name)
                    let trantypes = g.GroupBy(tt => tt.TransactionType.Name)
                    select new {
                       User = g.Key,
                       Platforms = platforms, 
                       TransactionTypes = trantypes 
                    }).ToList()
                    .Select(u => new TransactionByDay {
                        User=u.User, 
                        Platforms=u.Platforms.ToDictionary(tt => tt.Key, tt => tt.Count()),
                        TransactionTypes = u.TransactionTypes.ToDictionary(tt => tt.Key, tt => tt.Count())
                    });
 //...
}

【讨论】:

    【解决方案3】:

    这个想法是通过首先获取尽可能多的数据来尝试减少查询和包含。因此,无需在每个事务中包含PlatformTransactionType,您只需在Dictionary 中查询它们一次并查找数据即可。此外,我们还可以并行处理,然后一次保存所有数据。

        public static void AggregateTransactions()
        {
            using (var db = new ApplicationDbContext())
            {
                db.ChangeTracker.AutoDetectChangesEnabled = false;
    
                //Get a list of users who have transactions  
                var transactionsByUser = db.Transactions
                   .GroupBy(x => x.User) //Not sure if EF Core supports this kind of grouping
                   .ToList();
    
                var platforms = db.Platforms.ToDictionary(ks => ks.PlatformId);
                var Transactiontypes = db.TransactionTypes.ToDictionary(ks => ks.TransactionTypeId);
                var bag = new ConccurentBag<TransactionByDay>();
    
                Parallel.ForEach(transactionsByUser, transaction => 
                {
                    //Aggregate Platforms from all transactions for user
                    Dictionary<string, int> platforms = new Dictionary<string, int>(); //This can be converted to a ConccurentDictionary
    
                    //This can be converted to Parallel.ForEach
                    foreach (var item in _transactions.Select(x => platforms[x.PlatformId]).GroupBy(x => x.Name).ToList())
                    {
                        platforms.Add(item.Key, item.Count());
                    };
    
                   //Aggregate TransactionTypes from all transactions for user
                   Dictionary<string, int> transactionTypes = new Dictionary<string, int>(); //This can be converted to a ConccurentDictionary
    
                    //This can be converted to Parallel.ForEach
                    foreach (var item in _transactions.Select(x => Transactiontypes[c.TransactionTypeId]).GroupBy(x => x.Name).ToList())
                    {
                        transactionTypes.Add(item.Key, item.Count());
                    };
    
                    bag.Add(new TransactionByDay
                    {
                        User = transaction.Key,
                        Platforms = platforms,     //The dictionary list is represented as json in table
                        TransactionTypes = transactionTypes     //The dictionary list is represented as json in table
                    });
                });
    
                //Before calling this we may need to check the status of the Parallel ForEach, or just convert it back to regular foreach loop if you see no benefit.
                db.AddRange(bag);
                db.SaveChanges();
            }
        }
    

    变体 #2

        public static void AggregateTransactions()
        {
            using (var db = new ApplicationDbContext())
            {
                db.ChangeTracker.AutoDetectChangesEnabled = false;
    
                //Get a list of users who have transactions  
                var users = db.Transactions
                   .Select(x => x.User)
                   .Distinct().ToList();
    
                var platforms = db.Platforms.ToDictionary(ks => ks.PlatformId);
                var Transactiontypes = db.TransactionTypes.ToDictionary(ks => ks.TransactionTypeId);
                var bag = new ConccurentBag<TransactionByDay>();
    
                Parallel.ForEach(users, user => 
                {
                    var _transactions = db.Transactions
                    .Where(x => x.User == user)
                    .ToList();
    
                    //Aggregate Platforms from all transactions for user
                    Dictionary<string, int> userPlatforms = new Dictionary<string, int>();
                    Dictionary<string, int> userTransactions = new Dictionary<string, int>();
    
                    foreach(var transaction in _transactions)
                    {
                       if(platforms.TryGetValue(transaction.PlatformId, out var platform))
                       {
                           if(userPlatforms.TryGetValue(platform.Name, out var tmp))
                           {
                               userPlatforms[platform.Name] = tmp + 1;
                           }
                           else
                           {
                               userPlatforms.Add(platform.Name, 1);
                           }
                       }
    
                       if(Transactiontypes.TryGetValue(transaction.TransactionTypeId, out var type))
                       {
                           if(userTransactions.TryGetValue(type.Name, out var tmp))
                           {
                               userTransactions[type.Name] = tmp + 1;
                           }
                           else
                           {
                               userTransactions.Add(type.Name, 1);
                           }
                       }
                    }
    
                    bag.Add(new TransactionByDay
                    {
                        User = user,
                        Platforms = userPlatforms,     //The dictionary list is represented as json in table
                        TransactionTypes = userTransactions     //The dictionary list is represented as json in table
                    });
    
                });
    
                db.AddRange(bag);
                db.SaveChanges();
    
            }
        }
    

    【讨论】:

    • 我选择将查询拆分为每个用户,因为将所有事务加载到一个列表中,就像您拥有的那样(可能是数百万)会大大影响性能。
    • 我明白了,您是否考虑过按块而不是按用户进行查询(假设您每 10K 事务访问数据库)?
    • 我已经用第二个变体更新了答案,它可以获取每个用户的数据,它可能会激发你做类似甚至不同的事情。玩得开心!
    • 这里的瓶颈可能只是你有很多数据要同时获取和保存,你最终可能只是把这些全部交给 SQL 和/或 ADO 并将这个逻辑放在存储过程中.您还可以考虑让这个任务在调度程序上每晚运行,或者甚至不是每天处理它,而是全天分块处理它,这样您可以处理的数据更少。
    • 谢谢。我将使用存储过程。 +1
    猜你喜欢
    • 1970-01-01
    • 2015-06-28
    • 2020-10-05
    • 2015-09-27
    • 2020-07-23
    • 2020-03-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多