【问题标题】:Save entities in batch to Database将实体批量保存到数据库
【发布时间】:2020-05-03 09:46:11
【问题描述】:

我正在创建一个将日志消息存储在数据库中的 Web API。要求指定我的 API 需要能够并行存储每秒 900 条消息。我创建了以下简单的存储库方法来插入实体:

public async Task<int> InsertListAsync(IEnumerable<LogMessage> logMessages)
{
    await _context.LogMessages.AddRangeAsync(logMessages);
    return await _context.SaveChangesAsync();
}

这是我的控制器代码:

public async Task<IActionResult> LogMessage([FromBody] IEnumerable<LogMessageDTO> logMessageModels)
{
    try
    {
        var logMessages = logMessageModels.Select(x => LogMessageDTOMapper.Map(x));
        await _repo.InsertListAsync(logMessages);
        return new OkResult();
    }
    catch (Exception e)
    {
        _logger.LogError(1000,e,"Error while processing LogMessages");
        return BadRequest($"Error while processing LogMessages:{e.Message}{Environment.NewLine}{e.StackTrace}");
    }
}

此测试适用于同时发送请求的客户端数量相对较少(最多 90 个)。 但是,我编写了以下集成测试(GetStringContentForPostRequest 函数基本上会创建一定数量的随机生成的 LogMessage,在本例中为 1):

[Fact]
public void Test_MultipleRequests()
{
    // Arrange
    var client = _factory.CreateClient();
    //Act
    var responses = new List<Task<HttpResponseMessage>>();
    for (int i = 0; i < 900; i++)
    {

        responses.Add(client.PostAsync("/logs", GetStringContentForPostRequest(1)));
    }
    Task.WaitAll(responses.ToArray());
    // Assert
    foreach (var item in responses)
    {
        item.Result.EnsureSuccessStatusCode(); // Status Code 200-299
    }
}

此测试一次发送 900 个请求,每个请求包含一个 LogMessage。但是,此测试失败,并显示错误消息:

System.InvalidOperationException: An exception has been raised that is likely due to a transient failure.
 ---> Npgsql.PostgresException (0x80004005): 53300: sorry, too many clients already

一种解决方案可能是在我的 Postgres 数据库服务器上增加允许的连接数量,但有没有一种简单的方法可以首先收集一段时间内收到的所有日志消息并将它们一次全部保存到数据库中?

【问题讨论】:

  • 你已经在使用连接池了吗?
  • @madflow 我没试过。
  • 这听起来像是一个连接池可以让您摆脱烦恼的例子。但是 - 我不知道 ef-core 是否有这个内置:S
  • 谢谢!我试试看

标签: c# postgresql asp.net-core-mvc ef-core-3.0


【解决方案1】:

我通过使用 ConcurrentQueue 作为我的 LogMessages 的缓冲区来解决它。我没有将它们直接保存到我的控制器中的数据库中,而是将它们添加到缓冲区中,该缓冲区作为单例注入到我的 API 中。

public class Buffer: IBuffer
{
    private ConcurrentQueue<LogMessageDTO> _logMessageDTOs;
    public Buffer()
    {
        _logMessageDTOs = new ConcurrentQueue<LogMessageDTO>();
    }
    public ConcurrentQueue<LogMessageDTO> LogMessageDTOs()
    {
        return _logMessageDTOs;
    }

}

所以我的控制器动作现在看起来像这样:

public async Task<IActionResult> LogMessage([FromBody] IEnumerable<LogMessageDTO> logMessageModels)
{
    try
    {
        await Task.Run(() =>
        {
            foreach (var item in logMessageModels)
            {
                _buffer.LogMessageDTOs().Enqueue(item);
            }
        });
        return new OkResult();
    }
    catch (Exception e)
    {
        _logger.LogError(1000, e, "Error while processing LogMessages");
        return BadRequest($"Error while processing LogMessages:{e.Message}{Environment.NewLine}{e.StackTrace}");
    }
}

这会为调用此 API 的消费者产生更快的响应时间。

这个 ConcurrentQueue 然后被 TimedHostService 清空,我在 Microsoft docs page 上阅读了更多关于它的内容。这个定时主机服务的本质是清空并发队列,并将项目保存到数据库中。这是我的 TimedHostService 的DoWork 方法:

private async void DoWork(object state)
{
    _logger.LogInformation("Timed background service is working");
    var logMessageDTOs = _buffer.LogMessageDTOs();
    using (var scope = _services.CreateScope())
    {
        var repository =
            scope.ServiceProvider
                .GetRequiredService<ILogMessageRepository>();
        var mappedLogMessages = new List<LogMessage>();
        while (logMessageDTOs.TryDequeue(out LogMessageDTO logMessageDTO))
        {
            mappedLogMessages.Add(LogMessageDTOMapper.Map(logMessageDTO,_logMessagePrefix));
        }
        await repository.InsertListAsync(mappedLogMessages);
    }
    _logger.LogInformation("Timed background service has stopped working");
}

这导致需要到数据库的连接要少得多,因为并非每个控制器操作都会实例化与数据库的新连接。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-02-02
    • 2021-08-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多