【问题标题】:MassTransit - random mediator response not consumedMassTransit - 未消耗随机中介响应
【发布时间】:2021-09-01 06:18:41
【问题描述】:

所以一百分之一的(中介)消息没有被消费,我不知道为什么。 通过 rabbitMq 发送的消息很好。

例外:

"InnerException": {
    "Type": "MassTransit.MessageNotConsumedException",
    "Uri": "loopback://localhost/response",
    "TargetSite": "System.Threading.Tasks.Task Send(MassTransit.ReceiveContext, GreenPipes.IPipe`1[MassTransit.ReceiveContext])",
    "Message": "loopback://localhost/response => The message was not consumed",
    "Data": {},
    "Source": "MassTransit",
    "HResult": -2146233088
}

那么发生了什么。我有一个同时使用 Mediator 和 RabbitMQ 的 API。大部分时间一切正常。我在这样的启动类中注册了两个:

        services.AddMassTransit(cfg =>
        {
            cfg.UsingRabbitMq(ConfigureRabbitMq);
        });

        services.AddMassTransitHostedService();

        services.AddMediator(cfg => 
        { 
            cfg.AddMediatorHandlers();
        });

        ....

    public void ConfigureRabbitMq(IBusRegistrationContext context, IRabbitMqBusFactoryConfigurator configurator)
    {
        var rabbitConfig = RabbitMqConfig.Get<RabbitMqConfiguration>();

        configurator.Host(rabbitConfig.Host, rabbitConfig.VirtualHost , hfg => 
        {
            hfg.Password(rabbitConfig.Password);
            hfg.Username(rabbitConfig.UserName);
        });
    }

现在在我的控制器中,当我收到请求时,我通过 Mediator 将其传递给应该将请求保存到数据库的处理程序,将其传递给 rabbitMq 并在请求被保存/发送回控制器时返回消息。

这里是控制器:

    private readonly IRequestClient<CreateCommand> _createProcessor;

    public async Task<IActionResult> CreateManageServicesRequest([FromBody] CreateRequest request)
      {
        try
        {
            var result = await _createProcessor.GetResponse<CreateResponse>(new CreateCommand() { ApiRequest = request});

            if (result.Message.Queued.HasValue)
                return Ok();
            else
                return Accepted();
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed httpRequest for {methodName}", nameof(CreateManageServicesRequest));
            return StatusCode((int)StatusCodes.Status500InternalServerError);
        }
    }

这是发生异常的处理程序:

    public async Task Consume(ConsumeContext<CreateCommand> context)
    {
        var endpoint = await bus.GetSendEndpoint(new Uri($"exchange:{rabbitOptions.ManageServiceQueueName}"));

        using (var sql = sqlFactory.Cip)
        {
            /*SAVE REQUEST TO DATABASE*/
            sql.StartTransaction(System.Data.IsolationLevel.Serializable);

            .... /* Not important for stack-overflow, just saving request to database */

            sql.Commit();

            /* QUEUE REQUEST*/
            try
            {
                    /*Queue via Rabbit MQ*/
                    await endpoint.Send(new SendCommand() { QueuedRequest = request }, context.CancellationToken);

                    /* Send response ---- THIS FAILS RANDOMLY ---- */
                    await context.RespondAsync(new CreateResponse() { Queued =  DateTimeOffset.Now });
                
            }
            catch (Exception ex)
            { 
                using (logger.BeginScope(new Dictionary<string, object>() { { "ManageServiceRequest", transactionId } }))
                    logger.LogError(ex, "Failed to queque manage service request");
            }
        }
    }

我不知道如何解决这个问题,也没有在 MassTransit 网络或 Git 上找到任何相关信息。

【问题讨论】:

  • 我在您提供的那些 sn-ps 中看不到任何 MediatR 代码。
  • Masstransit 实现了自己的中介
  • 所以,这不是 MediatR?在您的问题中,您提到了“mediatr”,因此造成了混淆。
  • 失败的请求是否达到了默认超时值(30s)?
  • 你好克里斯,我会尝试复制问题,看看是否是问题所在。我没有考虑过超时甚至对调解人有效。

标签: c# masstransit


【解决方案1】:

所以问题出在范围内。我没有添加中间件来获取控制器的范围。如此处所写:https://masstransit-project.com/usage/mediator.html

自从添加了AddHttpContextAccessor之后,问题就没有了。

【讨论】:

    猜你喜欢
    • 2020-10-04
    • 2020-08-17
    • 1970-01-01
    • 2021-07-19
    • 1970-01-01
    • 2019-11-23
    • 2012-11-23
    • 2021-05-24
    • 1970-01-01
    相关资源
    最近更新 更多