【问题标题】:MassTransit - Explanation of PrefetchCount and multiple channels for a single consumerMassTransit - PrefetchCount 和单个消费者的多个渠道的解释
【发布时间】:2020-04-29 01:00:01
【问题描述】:

我一直在玩 PreFetch,并试图弄清楚为什么 PreFetch 在队列的管理界面上总是设置为 0。在 RabbitMQ 管理界面中,我可以看到通道上配置的 Prefetch,但看不到队列本身。我还注意到它们被注册为“全球”而不是“每个消费者”,但对于我的生活,我似乎无法在 MassTransit 中找到改变它的设置,尽管我猜我有一个误解关于这是如何工作的,文档并没有帮助给我一个 ELI5。

这是一个示例配置:

var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
   var host = cfg.Host(
        new Uri(busSettings.HostAddress),
        h =>
        {
            h.Username(busSettings.Username);
            h.Password(busSettings.Password);
        });

    cfg.ReceiveEndpoint(
        host,
        "TEST-QUEUE-PF",
        ec =>
        {
            ec.Consumer<MyConsumer>(context);
            ec.PrefetchCount = 50; // consumer specific
            ec.UseConcurrencyLimit(1); // consumer specific
        });

    cfg.PrefetchCount = 100; // bus control specific
    cfg.UseConcurrencyLimit(1); // bus control specific
});

这将创建以下队列:

然后查看频道,我看到以下有关预取的信息:

如果我查看所有频道,我会看到以下内容:

我很难理解这些 PrefetchCounts 中的每一个都与什么相关。

作为背景知识,我们有几个运行消费者的多核服务器(即循环,或者更恰当地说是“饥饿的河马”,因为我不关心平等分配)。 PrefetchCount 和 ConcurrencyLimit 的默认设置对我们来说不是很好,因为我们的消费者有很多工作要做,而且数据库服务器超载导致超时。我正在寻找一种方法来配置这些消费者,以免他们这样做。

这是 MassTransit 5.5.5,因为任何超出的东西都会破坏 UseSerilog() 集成,我找不到简单的升级路径。 Erlang 和 RabbitMq 本身就是当前版本。这是更详细的 AutoFac 模块:

private class BusModule : Module
{
    protected override void Load(ContainerBuilder builder)
    {
        builder.RegisterAssemblyTypes(GetType().Assembly).As<IConsumer>();
        builder.Register(context =>
        {
            var busSettings = context.Resolve<BusSettings>();
            var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                var host = cfg.Host(
                    new Uri(busSettings.HostAddress),
                    h =>
                    {
                        h.Username(busSettings.Username);
                        h.Password(busSettings.Password);
                    });

                cfg.ReceiveEndpoint(
                    host,
                    $"TEST-QUEUE-GLOBAL", // shared queue name for all nodes
                    ec =>
                    {
                        ec.PrefetchCount = 50;
                        ec.UseConcurrencyLimit(2);
                        ec.Consumer<MyConsumer>(context);
                        ec.EnablePriority(5);
                        ec.UseRetry(retryConfig =>
                        {
                            retryConfig
                                .Intervals(new[] { 1, 2, 4, 8, 16, 32 }
                                .Select(t => TimeSpan.FromMinutes(t))
                                .ToArray());
                            retryConfig
                                .Handle<HttpRequestException>();
                            retryConfig
                                .Handle<SwaggerException>(ex => ex.IsRetryValid());
                        });
                    });

                cfg.PrefetchCount = 100;
                cfg.UseConcurrencyLimit(2);
                cfg.UseSerilog();

                var correlationIdProvider = context.Resolve<ICorrelationProvider>();
                cfg.ConfigurePublish(x => x.UseExecute(sendContext =>
                {
                    sendContext.CorrelationId = 
                        sendContext.CorrelationId == Guid.Empty ? 
                            correlationIdProvider.GetId() : sendContext.CorrelationId; // cascade
                }));
            });

            return busControl;
        })
        .SingleInstance()
        .As<IBusControl>()
        .As<IBus>();
    }
}

【问题讨论】:

    标签: c# rabbitmq masstransit


    【解决方案1】:

    首先,我假设您使用的是较旧版本的 MassTransit,因为从 v6 开始,该开关已不再使用全局预取。

    其次,高预取计数与 1 的并发限制相结合将导致 (prefetchcount - 1) 条消息位于等待处理的接收端点上,而一次处理 1 条消息。因此,如果只有 50 条消息,第一个节点可能会全部获取它们,然后您的其他节点处于空闲状态,因为消息在单个节点上等待通过瓶颈。

    当前版本的 RabbitMQ 管理控制台,通道预取如下所示:

    由于 MassTransit 仅将单个消费者放在通道上,以前的方法本质上将消费者限制为全局通道预取,但现在更加明确。此外,新设置适用于不支持全局预取设置的仲裁队列。

    如果您的数据库超载,并且已经优化了数据库查询以避免锁定/阻塞,并且需要减少流量,请将预取降低到接近 140% 的并发限制。所以,说真的,如果你是 1,请将 prefetch 设置为 2。

    【讨论】:

    • 我假设每个分布式消费者都使用相同的队列名称不是问题吗?
    • 关于上述评论,我不在乎哪个节点获取和处理消息,只要添加节点有助于更快地处理总消息即可。基本上我称之为“饥饿的河马”方法。关于版本,是的,这是 MassTransit 5.5.5。上面的任何其他内容似乎都破坏了 Serilog 集成 (),我在 AutoFac 模块中找不到简单的升级路径。我添加了完整的模块以使其更清晰。
    • 饥饿的河马?我称之为竞争消费者,这就是负载平衡与队列一起工作的方式,所以是的,完全没问题。无论哪个节点获得消息都会消耗它。只是不要让这么多人等待一个线程一次处理一条消息。
    • 另外,请注意,由于并发限制为 1 或 2,以及如此大的重试次数,这些消息将阻止任何预取,直到重试完成。如果服务不可用,这可能很有用,可以避免消息进入错误队列,但请注意这种行为。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-11-24
    • 2023-03-20
    • 1970-01-01
    • 2016-09-11
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多