【问题标题】:Best way to make send bulk emails parallel并行发送批量电子邮件的最佳方法
【发布时间】:2014-09-09 00:31:45
【问题描述】:

我是 TPL(任务并行库)的新手,我很难将我的进程配置为并行运行任务。

我正在开发一个发送大量电子邮件的应用程序(比如每分钟数千封,这就是我的想法),但是当我看到处理器性能时,它并不好:我很确定会有很多开销因为我没有正确使用任务库

这是我的代码:

public async void MainProcess()
{
    var batches = emailsToProcess.Batch(CONST_BATCHES_SIZE);
    
    foreach (var batch in batches.AsParallel()
        .WithDegreeOfParallelism(Environment.ProcessorCount))
    {
         await Task.WhenAll(from emailToProcess in batch 
                    select ProcessSingleEmail(emailToProcess));
        _emailsToProcessRepository.MarkBatchAsProcessed(batch);
    }
}

private async Task ProcessSingleEmail(EmailToProcess emailToProcess)
{
    try
    {
        MailMessage mail = GetMail(emailToProcess); //static light method
        await _smtpClient.SendAsync(sendGridMail);
        emailToProcess.Processed = true;
    }
    catch (Exception e)
    {
        _logger.Error(ErrorHelper.GetExceptionMessage(e, 
                    string.Format("Error sending Email ID #{0} : ", 
                    emailToProcess.Id)), e);
    }
}

(我知道它可能看起来很糟糕:请随意烤我☺)

我需要它以这种方式运行:我需要批量处理多个记录(顺便说一句,我正在使用允许我使用“批处理”方法的库),因为我需要标记一个批处理进程完成发送时在数据库中处理的记录数。

这个过程实际上正在做我想做的事:除了慢得要命。正如您在 perfmon 中看到的那样,处理器并没有以非常高的容量工作:

最好的方法是什么?有什么建议吗?

编辑:我意识到我所拥有的是一个开销问题。 有什么工具或简单的方法可以检测和纠正它们吗?

【问题讨论】:

  • 我希望限制因素是您的网络带宽,而不是您的 CPU... 随心所欲地并行化您的 CPU,它不会为您提供更快的网络连接。
  • @abelenky 感谢您的回复,但我认为这不是问题所在。我什至尝试用 Task.Delay(1500) 替换 SendAsync,这大约是发送电子邮件所需的,结果完全一样。
  • 当然你的网络是这里的限制因素 - 但我认为你可以在这里做的比.WithDegreeOfParallelism(Environment.ProcessorCount) 更多,因为你正在尝试异步(实验一下) - 除此之外你可能不得不尝试框架 Smtpclient 的替代方案......顺便说一句:这将是什么样的批量电子邮件?我们周围有足够的垃圾邮件;)
  • 哈哈,谢谢@CarstenKönig,它不是垃圾邮件,只是一个类似于“linkedin”警报的小型网站。我不认为网络是问题,因为我尝试使用 Task.Delay 模拟 smtpclient,这意味着没有调用其他需要网络的操作并且结果是相同的。很好的观察,我没有使用 SmtpClient 而是使用 http 的 Sendgrid api。
  • 不,你说你用 1.5 秒的 Task.Delay 嘲笑它并得到了相同的结果(没有任何证据) - 所以如果你的发送确实需要 1.5 秒。 (你无法控制的代码位)显然有地方开始寻找(你可以像我说的那样增加并行任务的数量 - 但 1500 毫秒是巨大的!)

标签: c# .net parallel-processing task-parallel-library .net-4.5


【解决方案1】:

您所做的不是 CPU 限制,而是 I/O 限制,因此如果处理器可能会影响您的性能,请使用将并发任务的数量限制为数量。尝试并行启动更多任务。

例如,下面的代码将异步处理所有电子邮件,但并行处理限制为 100 封电子邮件。它使用ForEachAsync 扩展方法进行处理,该方法允许使用参数限制并行度,因此我会尝试尝试使该参数更大。

如果可能,您可能还希望使 MarkBatchAsProcessed 方法异步,因为这也可能会限制性能。

public static class Extensions
{
    public static async Task ExecuteInPartition<T>(IEnumerator<T> partition, Func<T, Task> body)
    {
        using (partition)
            while (partition.MoveNext())
                await body(partition.Current);
    }

    public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
    {
        return Task.WhenAll(
            from partition in Partitioner.Create(source).GetPartitions(dop)
            select ExecuteInPartition(partition, body));
    }
}

public Task MainProcess()
{
    // Process 100 emails at a time
    return emailsToProcess.ForEachAsync(100, async (m) =>
    {
        await ProcessSingleEmail(m);                
    });

    _emailsToProcessRepository.MarkBatchAsProcessed(emailsToProcess);
}

您还应该避免使用 void 返回异步方法,它们不会传播异常并且无法组合或等待,并且它们主要用于事件处理程序,因此我将 MainProcess 更改为返回 Task

更新

上面代码中的数字 100 表示任何时候最多有 100 个并发任务,所以它更像是一个滑动窗口而不是一个批处理。如果您想批量处理电子邮件,可以执行以下操作(假设批次具有 Count 属性:

public async Task MainProcess()
{
    var batches = emailsToProcess.Batch(CONST_BATCHES_SIZE);

    foreach (var batch in batches)
    {
         return batch.ForEachAsync(batch.Count, async (m) =>
         {
             await ProcessSingleEmail(m);                
         });

       _emailsToProcessRepository.MarkBatchAsProcessed(batch);             
    }
}

【讨论】:

  • 感谢@ned-stoyanov 花时间向我展示这种方法!只是几个 cmets:1)我认为 100 表示分区数而不是分区大小。 2) 我应该在哪里更新批次?我应该将它作为操作参数传递给相同的扩展方法并在 using(partition) 结束时执行吗?
猜你喜欢
  • 2011-09-08
  • 2012-06-14
  • 2019-06-26
  • 2014-08-19
  • 1970-01-01
  • 1970-01-01
  • 2011-10-02
  • 2012-09-20
  • 1970-01-01
相关资源
最近更新 更多