【问题标题】:Parallel.ForEach freezes after some time [duplicate]Parallel.ForEach 在一段时间后冻结[重复]
【发布时间】:2017-08-02 14:32:47
【问题描述】:

我想使用Parallel.ForEach 添加 1000 多个任务。下面的代码用于发送电子邮件通知。问题是它仅适用于大约 150 ~ 200 条通知并且我收到了电子邮件,但之后代码被冻结并且没有收到任何电子邮件。

有人可以指导我正确的方向吗?

var exceptions = new ConcurrentQueue<Exception>();

try
{
    List<ParallelWorker_EmailNotification> workers = new List<ParallelWorker_EmailNotification>();

    foreach (Email mail in listEmails)
    {
        workers.Add(new ParallelWorker_EmailNotification(mail));
    }

    Parallel.ForEach(workers, async worker =>
    {
        try
        {
            await worker.SendNotification();
        }
        catch (Exception ex)
        {
            exceptions.Enqueue(ex);
        }
    });
}
catch (Exception ex)
{
    exceptions.Enqueue(ex);
}

【问题讨论】:

  • 从 ParallelWorker_EmailNotification 添加代码

标签: c# .net linq parallel-processing


【解决方案1】:

Parallel.ForEach 不适用于传入的异步函数,async worker =&gt; 的方法签名是 async void,这可能是您问题的根源。 Parallel.ForEach我们解除阻塞,因为它认为工作已经完成,但工作仍在后台处理,这就是为什么你看不到处理的项目。

最简单的解决方案(如果 SendNotification 是一个适当的异步函数)只是选择项目并将所有任务放入 IEnumerable 并等待它们。

    var exceptions = new ConcurrentQueue<Exception>();

    try
    {
        var tasks = listEmails.Select(mail => new ParallelWorker_EmailNotification(mail))
                              .Select(async worker =>
            {
                try
                {
                    await worker.SendNotification();
                }
                catch (Exception ex)
                {
                    exceptions.Enqueue(ex);
                }
            });
        await Task.WhenAll(tasks);
    }
    catch (Exception ex)
    {
        exceptions.Enqueue(ex);
    }

如果SendNotification 是一个需要一些时间才能将控制权交还给调用者的函数,那么最好的解决方案是使用TPL Dataflow 进行处理。

    var exceptions = new ConcurrentQueue<Exception>();

    try
    {
        var actionBlock = new ActionBlock<ParallelWorker_EmailNotification>(async worker =>
            {
                try
                {
                    await worker.SendNotification();
                }
                catch (Exception ex)
                {
                    exceptions.Enqueue(ex);
                }
            }, new ExecutionDataflowBlockOptions
                    {
                     MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded                        
                    }
            );

        foreach (Email mail in listEmails)
        {
            actionBlock.Post(new ParallelWorker_EmailNotification(mail));
        }
        actionBlock.Complete();
        actionBlock.Completion.Wait();
    }
    catch (Exception ex)
    {
        exceptions.Enqueue(ex);
    }

【讨论】:

  • TPL 数据流示例很好。有没有一种方法可以让代码不必等待“actionBlock.Completion.Wait();”并移动到下一批,但确保上一批确实完成?
  • 在某些时候,您必须“等待并检查”才能看到工作已完成,您想将检查放在哪里取决于您。如果您想从actionBlock.Completion 返回任务,您可以让调用者每批次调用一次您的函数,一旦所有批次完成,对所有返回的任务执行Task.WaitAll(batchTasks)await Task.WhenAll(batchTasks)
猜你喜欢
  • 1970-01-01
  • 2021-05-26
  • 2021-03-13
  • 2012-12-15
  • 1970-01-01
  • 2015-11-08
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多