【发布时间】:2021-09-01 06:18:41
【问题描述】:
所以一百分之一的(中介)消息没有被消费,我不知道为什么。 通过 rabbitMq 发送的消息很好。
例外:
"InnerException": {
"Type": "MassTransit.MessageNotConsumedException",
"Uri": "loopback://localhost/response",
"TargetSite": "System.Threading.Tasks.Task Send(MassTransit.ReceiveContext, GreenPipes.IPipe`1[MassTransit.ReceiveContext])",
"Message": "loopback://localhost/response => The message was not consumed",
"Data": {},
"Source": "MassTransit",
"HResult": -2146233088
}
那么发生了什么。我有一个同时使用 Mediator 和 RabbitMQ 的 API。大部分时间一切正常。我在这样的启动类中注册了两个:
services.AddMassTransit(cfg =>
{
cfg.UsingRabbitMq(ConfigureRabbitMq);
});
services.AddMassTransitHostedService();
services.AddMediator(cfg =>
{
cfg.AddMediatorHandlers();
});
....
public void ConfigureRabbitMq(IBusRegistrationContext context, IRabbitMqBusFactoryConfigurator configurator)
{
var rabbitConfig = RabbitMqConfig.Get<RabbitMqConfiguration>();
configurator.Host(rabbitConfig.Host, rabbitConfig.VirtualHost , hfg =>
{
hfg.Password(rabbitConfig.Password);
hfg.Username(rabbitConfig.UserName);
});
}
现在在我的控制器中,当我收到请求时,我通过 Mediator 将其传递给应该将请求保存到数据库的处理程序,将其传递给 rabbitMq 并在请求被保存/发送回控制器时返回消息。
这里是控制器:
private readonly IRequestClient<CreateCommand> _createProcessor;
public async Task<IActionResult> CreateManageServicesRequest([FromBody] CreateRequest request)
{
try
{
var result = await _createProcessor.GetResponse<CreateResponse>(new CreateCommand() { ApiRequest = request});
if (result.Message.Queued.HasValue)
return Ok();
else
return Accepted();
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed httpRequest for {methodName}", nameof(CreateManageServicesRequest));
return StatusCode((int)StatusCodes.Status500InternalServerError);
}
}
这是发生异常的处理程序:
public async Task Consume(ConsumeContext<CreateCommand> context)
{
var endpoint = await bus.GetSendEndpoint(new Uri($"exchange:{rabbitOptions.ManageServiceQueueName}"));
using (var sql = sqlFactory.Cip)
{
/*SAVE REQUEST TO DATABASE*/
sql.StartTransaction(System.Data.IsolationLevel.Serializable);
.... /* Not important for stack-overflow, just saving request to database */
sql.Commit();
/* QUEUE REQUEST*/
try
{
/*Queue via Rabbit MQ*/
await endpoint.Send(new SendCommand() { QueuedRequest = request }, context.CancellationToken);
/* Send response ---- THIS FAILS RANDOMLY ---- */
await context.RespondAsync(new CreateResponse() { Queued = DateTimeOffset.Now });
}
catch (Exception ex)
{
using (logger.BeginScope(new Dictionary<string, object>() { { "ManageServiceRequest", transactionId } }))
logger.LogError(ex, "Failed to queque manage service request");
}
}
}
我不知道如何解决这个问题,也没有在 MassTransit 网络或 Git 上找到任何相关信息。
【问题讨论】:
-
我在您提供的那些 sn-ps 中看不到任何 MediatR 代码。
-
Masstransit 实现了自己的中介
-
所以,这不是 MediatR?在您的问题中,您提到了“mediatr”,因此造成了混淆。
-
失败的请求是否达到了默认超时值(30s)?
-
你好克里斯,我会尝试复制问题,看看是否是问题所在。我没有考虑过超时甚至对调解人有效。
标签: c# masstransit