【问题标题】:Optimizing DB inserts in .NET parallel processing在 .NET 并行处理中优化 DB 插入
【发布时间】:2016-06-28 07:01:26
【问题描述】:

我必须使用 AWS SES 发送大量电子邮件(比如每个工作 10,000 封)。找到a great blog 了解如何并行执行此操作,现在有一个关于如何将发送事务数据写入数据库的问题。我正在使用 npoco ORM 和 InsertBulk,粗略地看它打开一个连接并通过迭代每个 poco 来插入,然后关闭连接。每次发送都没有打开、写入和关闭,这是一个进步。我的想法是将数据库操作保持在最低限度,但我应该每隔 50 左右的电子邮件发送一次写入数据库,以防服务器或作业中断,该作业稍后可以从中断的地方继续,而不会发送重复等.

所以我开始使用 ConcurrentBag、线程锁定、转换为列表、将该列表发送到 npoco 以插入等。经过非常有限的测试并且它可以工作。但我确信这不是正确的方法,而且我没有信心在这里适当地使用线程。在这种情况下有什么建议?将concurrentbag传递给npoco进行插入,其他一些插入方法会更好还是可行?

 var bag = new ConcurrentBag<EmailSent>();
    Parallel.ForEach(recipients.AsParallel(), new ParallelOptions { MaxDegreeOfParallelism = maxParallelEmails },
          recipient =>
           {
         var response = client.SendEmail(request);
          bag.Add(new EmailSent() { JobId = jobId, MessageId = response.MessageId});
       }
    lock (syncRoot) 
        {
             count++;
             if (count % 50 == 0 || count == recipients.Count) 
              {
               var list = new List<EmailSent>();
                 while (!bag.IsEmpty)
                  {
                   EmailSent email;
                     if (bag.TryTake(out email))
                      {
                        list.Add(email);

                      }

               }
            repo.InsertBulk<EmailSent>(list);
       }
});

【问题讨论】:

  • 如果您为每封电子邮件生成一个唯一的 ID,这样它就不会在数据库中重复,并且还知道上次它在哪里被遗漏了。我肯定会在数据库应该很快..所以每发送 50 封电子邮件插入应该在一个线程中快速完成..

标签: .net amazon-web-services npoco amazon-ses


【解决方案1】:

如果您只是在寻找优化,一种方法是使用table valued parameters 进行插入,这样您就可以将多条记录发送到您的存储过程,而不是为每个插入调用一次。

在您的 SQL 服务器上定义参数类型,这看起来很像定义一个表。 (大部分示例来自上面的链接。)

CREATE TYPE dbo.CategoryTableType AS TABLE
( CategoryID int, CategoryName nvarchar(50) )

然后你将那个类型的参数添加到你的插入过程中:

CREATE PROCEDURE usp_UpdateCategories 
(@tvpNewCategories dbo.CategoryTableType READONLY)

在您的存储过程中,您可以从该参数中进行选择,就像是一个表变量一样。

INSERT INTO dbo.Categories (CategoryID, CategoryName)
SELECT nc.CategoryID, nc.CategoryName FROM @tvpNewCategories AS nc;

这样做的好处是您可以将所有插入作为单个操作执行。

在应用程序端,您将创建一个与您定义的表类型相对应的 DataTable。然后用您要插入的记录填充该表。

最后,在调用过程时,添加一个以你的DataTable为值的参数,指定SqlDbType = SqlDbType.Structured,'TypeName'是表类型的名称。

SqlParameter tvpParam = insertCommand.Parameters.AddWithValue(
"@tvpNewCategories", yourDataTable);
tvpParam.SqlDbType = SqlDbType.Structured;
tvpParam.TypeName = "dbo.CategoryTableType";

如果您在 2008 年之前使用过 SQL Server,您就会知道我们做了一些奇怪的事情来将多条记录传递给一个过程,例如连接字符串或发送和解析 XML。这更容易,并且大大减少了单个存储过程调用的数量。

【讨论】:

    【解决方案2】:

    您也可以使用ConcurrentQueue 代替ConcurrentBag。当您添加到队列或取出项目时,您不必担心锁定任何内容。如果您想以 n 的批次保存记录,您可以不断TryDeqeueue 并将出队的项目添加到集合中,直到集合计数为 nTryDequeue返回false,表示队列中没有任何内容。

    【讨论】:

      猜你喜欢
      • 2010-12-18
      • 2022-11-16
      • 2012-07-19
      • 1970-01-01
      • 2019-09-07
      • 2011-03-19
      • 2018-09-20
      • 2019-11-21
      • 1970-01-01
      相关资源
      最近更新 更多