【发布时间】:2020-04-29 01:00:01
【问题描述】:
我一直在玩 PreFetch,并试图弄清楚为什么 PreFetch 在队列的管理界面上总是设置为 0。在 RabbitMQ 管理界面中,我可以看到通道上配置的 Prefetch,但看不到队列本身。我还注意到它们被注册为“全球”而不是“每个消费者”,但对于我的生活,我似乎无法在 MassTransit 中找到改变它的设置,尽管我猜我有一个误解关于这是如何工作的,文档并没有帮助给我一个 ELI5。
这是一个示例配置:
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(
new Uri(busSettings.HostAddress),
h =>
{
h.Username(busSettings.Username);
h.Password(busSettings.Password);
});
cfg.ReceiveEndpoint(
host,
"TEST-QUEUE-PF",
ec =>
{
ec.Consumer<MyConsumer>(context);
ec.PrefetchCount = 50; // consumer specific
ec.UseConcurrencyLimit(1); // consumer specific
});
cfg.PrefetchCount = 100; // bus control specific
cfg.UseConcurrencyLimit(1); // bus control specific
});
这将创建以下队列:
然后查看频道,我看到以下有关预取的信息:
如果我查看所有频道,我会看到以下内容:
我很难理解这些 PrefetchCounts 中的每一个都与什么相关。
作为背景知识,我们有几个运行消费者的多核服务器(即循环,或者更恰当地说是“饥饿的河马”,因为我不关心平等分配)。 PrefetchCount 和 ConcurrencyLimit 的默认设置对我们来说不是很好,因为我们的消费者有很多工作要做,而且数据库服务器超载导致超时。我正在寻找一种方法来配置这些消费者,以免他们这样做。
这是 MassTransit 5.5.5,因为任何超出的东西都会破坏 UseSerilog() 集成,我找不到简单的升级路径。 Erlang 和 RabbitMq 本身就是当前版本。这是更详细的 AutoFac 模块:
private class BusModule : Module
{
protected override void Load(ContainerBuilder builder)
{
builder.RegisterAssemblyTypes(GetType().Assembly).As<IConsumer>();
builder.Register(context =>
{
var busSettings = context.Resolve<BusSettings>();
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(
new Uri(busSettings.HostAddress),
h =>
{
h.Username(busSettings.Username);
h.Password(busSettings.Password);
});
cfg.ReceiveEndpoint(
host,
$"TEST-QUEUE-GLOBAL", // shared queue name for all nodes
ec =>
{
ec.PrefetchCount = 50;
ec.UseConcurrencyLimit(2);
ec.Consumer<MyConsumer>(context);
ec.EnablePriority(5);
ec.UseRetry(retryConfig =>
{
retryConfig
.Intervals(new[] { 1, 2, 4, 8, 16, 32 }
.Select(t => TimeSpan.FromMinutes(t))
.ToArray());
retryConfig
.Handle<HttpRequestException>();
retryConfig
.Handle<SwaggerException>(ex => ex.IsRetryValid());
});
});
cfg.PrefetchCount = 100;
cfg.UseConcurrencyLimit(2);
cfg.UseSerilog();
var correlationIdProvider = context.Resolve<ICorrelationProvider>();
cfg.ConfigurePublish(x => x.UseExecute(sendContext =>
{
sendContext.CorrelationId =
sendContext.CorrelationId == Guid.Empty ?
correlationIdProvider.GetId() : sendContext.CorrelationId; // cascade
}));
});
return busControl;
})
.SingleInstance()
.As<IBusControl>()
.As<IBus>();
}
}
【问题讨论】:
标签: c# rabbitmq masstransit