【发布时间】:2019-09-26 13:04:58
【问题描述】:
我正在使用 MassTransit/RabbitMQ 在生产者/消费者场景中使用 2 个 .NET Core 控制台应用程序。我需要确保即使没有消费者启动并运行,来自生产者的消息仍然成功排队。这似乎不适用于 Publish() - 消息刚刚消失,所以我使用 Send() 代替。消息至少会排队,但如果没有任何消费者运行消息,所有消息都会最终进入“_skipped”队列。
这是我的第一个问题:这是基于需求的正确方法吗(即使没有消费者启动并运行,来自生产者的消息仍然成功排队)?
使用 Send(),我的消费者确实可以工作,但仍有许多消息从裂缝中掉入并被转储到“_skipped”队列中。消费者的逻辑很少(此时只是记录消息),所以它不是一个长时间运行的过程。
这是我的第二个问题:为什么还有这么多消息被转储到“_skipped”队列中?
这就引出了我的第三个问题:这是否意味着我的消费者也需要收听“_skipped”队列?
我不确定您需要为这个问题查看什么代码,但这里是 RabbitMQ 管理 UI 的屏幕截图:
生产者配置:
static IHostBuilder CreateHostBuilder(string[] args)
{
return Host.CreateDefaultBuilder()
.ConfigureServices((hostContext, services) =>
{
services.Configure<ApplicationConfiguration>(hostContext.Configuration.GetSection(nameof(ApplicationConfiguration)));
services.AddMassTransit(cfg =>
{
cfg.AddBus(ConfigureBus);
});
services.AddHostedService<CardMessageProducer>();
})
.UseConsoleLifetime()
.UseSerilog();
}
static IBusControl ConfigureBus(IServiceProvider provider)
{
var options = provider.GetRequiredService<IOptions<ApplicationConfiguration>>().Value;
return Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri(options.RabbitMQ_ConnectionString), h =>
{
h.Username(options.RabbitMQ_Username);
h.Password(options.RabbitMQ_Password);
});
cfg.ReceiveEndpoint(host, typeof(CardMessage).FullName, e =>
{
EndpointConvention.Map<CardMessage>(e.InputAddress);
});
});
}
生产者代码:
Bus.Send(message);
消费者配置:
static IHostBuilder CreateHostBuilder(string[] args)
{
return Host.CreateDefaultBuilder()
.ConfigureServices((hostContext, services) =>
{
services.AddSingleton<CardMessageConsumer>();
services.Configure<ApplicationConfiguration>(hostContext.Configuration.GetSection(nameof(ApplicationConfiguration)));
services.AddMassTransit(cfg =>
{
cfg.AddBus(ConfigureBus);
});
services.AddHostedService<MassTransitHostedService>();
})
.UseConsoleLifetime()
.UseSerilog();
}
static IBusControl ConfigureBus(IServiceProvider provider)
{
var options = provider.GetRequiredService<IOptions<ApplicationConfiguration>>().Value;
return Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri(options.RabbitMQ_ConnectionString), h =>
{
h.Username(options.RabbitMQ_Username);
h.Password(options.RabbitMQ_Password);
});
cfg.ReceiveEndpoint(host, typeof(CardMessage).FullName, e =>
{
e.Consumer<CardMessageConsumer>(provider);
});
//cfg.ReceiveEndpoint(host, typeof(CardMessage).FullName + "_skipped", e =>
//{
// e.Consumer<CardMessageConsumer>(provider);
//});
});
}
消费者代码:
class CardMessageConsumer : IConsumer<CardMessage>
{
private readonly ILogger<CardMessageConsumer> logger;
private readonly ApplicationConfiguration configuration;
private long counter;
public CardMessageConsumer(ILogger<CardMessageConsumer> logger, IOptions<ApplicationConfiguration> options)
{
this.logger = logger;
this.configuration = options.Value;
}
public async Task Consume(ConsumeContext<CardMessage> context)
{
this.counter++;
this.logger.LogTrace($"Message #{this.counter} consumed: {context.Message}");
}
}
【问题讨论】:
标签: rabbitmq masstransit