【问题标题】:Tasks not being processed as they complete完成后未处理的任务
【发布时间】:2016-12-09 14:18:08
【问题描述】:

我一直在尝试处理大量数据库查询的结果,因为它们变得可用。现在它们都是同时处理的。

有一个客户列表(现在大约 40 个),每个客户都有自己的数据库,并且都需要查询(使用 EF-6)。存储库上的查询方法返回一个任务。这些任务是循环创建的。

public Task<List<Employee>> FindByNameOrBirthDateAsync(string surName, DateTime? birthDate)
    {
        var query = ApplicationContext.Employees
            .Include(e => e.Address)
            .Include(e => e.CorporateEntities);

        if (!string.IsNullOrWhiteSpace(surName))
            query = query.Where(e => e.Surname == surName);

        if (birthDate != null)
            query = query.Where(e => e.BirthDate == birthDate);

        return query.ToListAsync();
    }

此代码被一个服务调用,而该服务又被一个 Web-API 控制器调用。

public class SearchService
{
    public List<Task<List<Employee>>> SearchOverAllClients(List<Client> clients, string surName, DateTime? birthDate)
    {
        return clients.Select(client => FindEmployees(client.Id, surName, birthDate)).ToList();
    }

    private Task<List<Employee>> FindEmployees(int clientId, string surname, DateTime? birthDate)
    {
        ApplicationUoW unit = new ApplicationUoW(clientId);

        return unit.Employees.FindByNameOrBirthDateAsync(surname, birthDate);

    }
}

在控制器中,Task.WhenAny() 应该在结果可用时将结果返回给客户端并将它们写入输出流。

我已经在每次迭代中使用 Fiddler 进行了测试,结果一次发送一个。所以我认为问题不在于流式编写代码。

    public HttpResponseMessage Get([FromUri]string surname = "", [FromUri]DateTime? birthDate = null)
    {
        List<Client> clients = _clientsService.GetClients();
        List<Task<List<Employee>>> tasks = _searchService.SearchOverAllClients(clients, surname, birthDate);

        HttpResponseMessage response = Request.CreateResponse();

        response.Content = new PushStreamContent(
            async (outputStream, httpContent, transportContext) =>
            {
                try
                {
                    while (tasks.Count > 0)
                    {
                        var task = await Task.WhenAny(tasks);
                        tasks.Remove(task);

                        List <Employee> employees = await task;
                        string responseContent =
                            await Task.Factory.StartNew(() => JsonConvert.SerializeObject(employees, Formatting.None, new JsonSerializerSettings() { ReferenceLoopHandling = ReferenceLoopHandling.Ignore }));

                        var buffer = Encoding.UTF8.GetBytes(responseContent);

                        await outputStream.WriteAsync(buffer, 0, buffer.Length);
                        await outputStream.FlushAsync();
                    }
                }
                catch (HttpException ex) when (ex.ErrorCode == -2147023667)
                {
                    //The remote host closed the connection.
                    return;
                }
                finally
                {
                    outputStream.Close();
                }
            });

        return response;
    }

此代码的问题在于它在处理结果之前等待所有任务完成。虽然我想在它们准备好后立即处理它们。

我一直在尝试使用 aysnc/await 的不同用法对该代码进行多种变体,但是到目前为止所有尝试均未成功。

更新 我已经按照 cmets 中的建议删除了不需要的并行代码。

【问题讨论】:

  • 嗯,List&lt;T&gt; 不是线程安全的,你不能从 Parallel.Foreach 内部调用 Add 而不进行某种锁定。
  • “并行”数据库查询通常较慢来自一个在一次往返中加载所需数据的查询。那是因为主要延迟是由网络延迟引起的。如果没有,您应该考虑优化索引,或检查 N+1 问题。顺便说一句,单个实体的并行加载一个 N+1 问题
  • 真正的吞吐量杀手是您通过并发连接数增加争用和锁定。这样可能会导致一些丑陋的死锁
  • @PanagiotisKanavos 我们使用并行查询的原因是因为我们正在查询不同的数据库(具有相同的模式)。这可能在不同的物理服务器上。
  • @ArnoldWiersma 不,您不应该使用ConcurrentBag。相反,根本没有理由使用Parallel.ForEach 启动所有任务。只需使用常规的foreach,或者如前所述,使用来自LINQ 的Select。创建新线程只是为了启动异步操作是积极的counter生产。

标签: c# entity-framework asp.net-web-api async-await


【解决方案1】:

我想你要找的是ConfigureAwait(false)

ASP.NET 实现SynchronizationContext 的方式保证async 代码将在整个执行过程中的任何给定时间在单个线程(但不一定是同一个线程)上运行。这意味着您现在发布的代码都将在单个线程上运行。例如,如果所有查询任务立即完成,var task = await Task.WhenAny(tasks) 之后的回调代码将不会并行运行 - 每个回调将同步运行,一个接一个。

通过转换为:await Task.WhenAny(tasks).ConfigureAwait(false),您告诉 .NET 回调代码可以在线程池线程上运行。但是你必须确保await 之后的代码是线程安全的并且不需要上下文。

如果您还不熟悉 SynchronizationContext 以及使用 async/await 时它的重要性,我发现这篇文章很有帮助: Await, SynchronizationContext, and Console Apps

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-06-18
    • 2013-09-09
    • 1970-01-01
    • 1970-01-01
    • 2015-05-21
    • 2022-01-15
    • 2013-11-24
    相关资源
    最近更新 更多