【问题标题】:MassTransit servicebuse optimize consumerMassTransit 服务总线优化消费者
【发布时间】:2019-07-01 07:10:38
【问题描述】:

我正在尝试使用 MassTransit 通过 Azure 服务总线队列进行请求/响应通信。 Sender 是 Azure WebApp,Consumer 是安装在本地机器上的 Windows 服务。

当涉及少量消息时,一切正常。但是,一旦我开始发送超过 ~20 msg/sec 的消息,我就会看到消费者的响应出现严重(1-2 秒)延迟。我的遥测数据告诉我,当消费者需要从队列中获取消息时,会发生延迟。

一个奇怪的,但我认为行为的重要部分:我可以看到队列中未读消息的当前负载量是平均常数,它是 25。如果我发送的消息比我在队列中看到的平均 50 条消息多 2 倍.由于消费方面的延迟,我预计队列会增长,但它是恒定的,所以它肯定是内部代码限制连接。

快速信息:

  • 机器上的硬件没有问题。 CPU/Mem 不高。
  • 我尝试在用户端使用 UseConcurrencyLimit、MaxConcurrentCalls、PrefetchCount 配置。它没有帮助
  • 我的发送者和消费者的解决方案代码在经典示例旁边。

消费者:.Net 框架 4.7.2 和 MassTransit.Azure.ServiceBus.Core 5.5.2

这是我删除了所有业务逻辑的侦听器类:

public class QueueListener
{
    private IBusControl Bus { get; set; }

    public QueueListener()
    {
        Bus = MassTransit.Bus.Factory.CreateUsingAzureServiceBus(serviceBusFactoryConfigurator =>
        {
            var host = serviceBusFactoryConfigurator.Host(SettingsHelper.AzureServiceBusConnectionString,
                (config) =>
                {
                    config.OperationTimeout = TimeSpan.FromSeconds(60);
                    config.TransportType = TransportType.AmqpWebSockets;
                });
            serviceBusFactoryConfigurator.ReceiveEndpoint(host, SettingsHelper.CouponQueryQueueName, e =>
            {
                e.Handler<JToken>(HandleMessage);

                e.UseConcurrencyLimit(16);
                e.MaxConcurrentCalls = 16;
                e.PrefetchCount = 32;
            });
            serviceBusFactoryConfigurator.EnableBatchedOperations = true;
            serviceBusFactoryConfigurator.DefaultMessageTimeToLive = TimeSpan.FromSeconds(60);
        });
    }

    private async Task HandleMessage(ConsumeContext context)
    {
        await Task.Delay(800);
        if (context.ExpirationTime > SystemDateTime.Now)
        {
            await context.RespondAsync(new CouponUsedList { CouponsUsed = new List<CouponCurrentUsed>() });
        }

    }

    public Task LaunchAsync()
    {
        return Bus.StartAsync();
    }

    public Task StopAsync()
    {
        return Bus.StopAsync();
    }
}

【问题讨论】:

  • 您可以使用github.com/MassTransit/MassTransit-Benchmark 对您的环境进行基准测试 - 可能会给您一些想法。此外,如果您发布有助于向您提供反馈的配置。您不需要任何过滤器,只需要在接收端点上使用 MaxConcurrentCalls 和 PrefetchCount 来调整 ASB 的性能。
  • @ChrisPatterson,我如何运行基准测试?我已经启动了它——它创建了一个队列并向它推送了 10K 条消息。什么都没有:)有什么错误?
  • @ChrisPatterson 我已经用听众的代码更新了我的问题。敬请期待。请注意,我添加了 800 毫秒的延迟来模拟将要发生的异步依赖调用
  • 所以,数学上,0.8 秒乘以 16 个并发大约是每秒 20 条消息。因此,除非您增加 MaxConcurrentCalls,否则您所看到的就是您将得到的。您也可以删除 UseConcurrencyLimit,它不是必需的。
  • @ChrisPatterson,原来我在消费者端缺少一个配置器。顺便说一句,我如何启动基准测试?我做不到

标签: message-queue azureservicebus masstransit


【解决方案1】:

似乎在这里,再一次,它只是缺少一个配置。您在 ReceiveEndpoint 中编写的所有代码都配置了消费者侦听器队列,而您在 CreateUsingAzureServiceBus 中提供的配置是消费者响应队列的配置。

我只需要在消费者配置中添加一行。如果没有这个配置,所有预取的消息都会被逐步处理

e.EnableBatchedOperations = true;

【讨论】:

  • 鉴于这种性能优势,您认为这应该是新的默认值吗,true
  • 我正在重新审视这个结论,似乎它实际上默认为 true,这不是帮助...不知道是什么。
  • 我确实对 Azure 服务总线进行了性能测试,大部分时间都花在了等待它上,等待 Azure 服务总线响应/推送消息。
  • 有什么改进的方法吗?
猜你喜欢
  • 2021-11-14
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多