【发布时间】:2016-12-09 14:18:08
【问题描述】:
我一直在尝试处理大量数据库查询的结果,因为它们变得可用。现在它们都是同时处理的。
有一个客户列表(现在大约 40 个),每个客户都有自己的数据库,并且都需要查询(使用 EF-6)。存储库上的查询方法返回一个任务。这些任务是循环创建的。
public Task<List<Employee>> FindByNameOrBirthDateAsync(string surName, DateTime? birthDate)
{
var query = ApplicationContext.Employees
.Include(e => e.Address)
.Include(e => e.CorporateEntities);
if (!string.IsNullOrWhiteSpace(surName))
query = query.Where(e => e.Surname == surName);
if (birthDate != null)
query = query.Where(e => e.BirthDate == birthDate);
return query.ToListAsync();
}
此代码被一个服务调用,而该服务又被一个 Web-API 控制器调用。
public class SearchService
{
public List<Task<List<Employee>>> SearchOverAllClients(List<Client> clients, string surName, DateTime? birthDate)
{
return clients.Select(client => FindEmployees(client.Id, surName, birthDate)).ToList();
}
private Task<List<Employee>> FindEmployees(int clientId, string surname, DateTime? birthDate)
{
ApplicationUoW unit = new ApplicationUoW(clientId);
return unit.Employees.FindByNameOrBirthDateAsync(surname, birthDate);
}
}
在控制器中,Task.WhenAny() 应该在结果可用时将结果返回给客户端并将它们写入输出流。
我已经在每次迭代中使用 Fiddler 进行了测试,结果一次发送一个。所以我认为问题不在于流式编写代码。
public HttpResponseMessage Get([FromUri]string surname = "", [FromUri]DateTime? birthDate = null)
{
List<Client> clients = _clientsService.GetClients();
List<Task<List<Employee>>> tasks = _searchService.SearchOverAllClients(clients, surname, birthDate);
HttpResponseMessage response = Request.CreateResponse();
response.Content = new PushStreamContent(
async (outputStream, httpContent, transportContext) =>
{
try
{
while (tasks.Count > 0)
{
var task = await Task.WhenAny(tasks);
tasks.Remove(task);
List <Employee> employees = await task;
string responseContent =
await Task.Factory.StartNew(() => JsonConvert.SerializeObject(employees, Formatting.None, new JsonSerializerSettings() { ReferenceLoopHandling = ReferenceLoopHandling.Ignore }));
var buffer = Encoding.UTF8.GetBytes(responseContent);
await outputStream.WriteAsync(buffer, 0, buffer.Length);
await outputStream.FlushAsync();
}
}
catch (HttpException ex) when (ex.ErrorCode == -2147023667)
{
//The remote host closed the connection.
return;
}
finally
{
outputStream.Close();
}
});
return response;
}
此代码的问题在于它在处理结果之前等待所有任务完成。虽然我想在它们准备好后立即处理它们。
我一直在尝试使用 aysnc/await 的不同用法对该代码进行多种变体,但是到目前为止所有尝试均未成功。
更新 我已经按照 cmets 中的建议删除了不需要的并行代码。
【问题讨论】:
-
嗯,
List<T>不是线程安全的,你不能从 Parallel.Foreach 内部调用Add而不进行某种锁定。 -
“并行”数据库查询通常较慢来自一个在一次往返中加载所需数据的查询。那是因为主要延迟是由网络延迟引起的。如果没有,您应该考虑优化索引,或检查 N+1 问题。顺便说一句,单个实体的并行加载是一个 N+1 问题
-
真正的吞吐量杀手是您通过并发连接数增加争用和锁定。这样可能会导致一些丑陋的死锁
-
@PanagiotisKanavos 我们使用并行查询的原因是因为我们正在查询不同的数据库(具有相同的模式)。这可能在不同的物理服务器上。
-
@ArnoldWiersma 不,您不应该使用
ConcurrentBag。相反,根本没有理由使用Parallel.ForEach启动所有任务。只需使用常规的foreach,或者如前所述,使用来自LINQ 的Select。创建新线程只是为了启动异步操作是积极的counter生产。
标签: c# entity-framework asp.net-web-api async-await