【发布时间】:2019-07-01 07:10:38
【问题描述】:
我正在尝试使用 MassTransit 通过 Azure 服务总线队列进行请求/响应通信。 Sender 是 Azure WebApp,Consumer 是安装在本地机器上的 Windows 服务。
当涉及少量消息时,一切正常。但是,一旦我开始发送超过 ~20 msg/sec 的消息,我就会看到消费者的响应出现严重(1-2 秒)延迟。我的遥测数据告诉我,当消费者需要从队列中获取消息时,会发生延迟。
一个奇怪的,但我认为行为的重要部分:我可以看到队列中未读消息的当前负载量是平均常数,它是 25。如果我发送的消息比我在队列中看到的平均 50 条消息多 2 倍.由于消费方面的延迟,我预计队列会增长,但它是恒定的,所以它肯定是内部代码限制连接。
快速信息:
- 机器上的硬件没有问题。 CPU/Mem 不高。
- 我尝试在用户端使用 UseConcurrencyLimit、MaxConcurrentCalls、PrefetchCount 配置。它没有帮助
- 我的发送者和消费者的解决方案代码在经典示例旁边。
消费者:.Net 框架 4.7.2 和 MassTransit.Azure.ServiceBus.Core 5.5.2
这是我删除了所有业务逻辑的侦听器类:
public class QueueListener
{
private IBusControl Bus { get; set; }
public QueueListener()
{
Bus = MassTransit.Bus.Factory.CreateUsingAzureServiceBus(serviceBusFactoryConfigurator =>
{
var host = serviceBusFactoryConfigurator.Host(SettingsHelper.AzureServiceBusConnectionString,
(config) =>
{
config.OperationTimeout = TimeSpan.FromSeconds(60);
config.TransportType = TransportType.AmqpWebSockets;
});
serviceBusFactoryConfigurator.ReceiveEndpoint(host, SettingsHelper.CouponQueryQueueName, e =>
{
e.Handler<JToken>(HandleMessage);
e.UseConcurrencyLimit(16);
e.MaxConcurrentCalls = 16;
e.PrefetchCount = 32;
});
serviceBusFactoryConfigurator.EnableBatchedOperations = true;
serviceBusFactoryConfigurator.DefaultMessageTimeToLive = TimeSpan.FromSeconds(60);
});
}
private async Task HandleMessage(ConsumeContext context)
{
await Task.Delay(800);
if (context.ExpirationTime > SystemDateTime.Now)
{
await context.RespondAsync(new CouponUsedList { CouponsUsed = new List<CouponCurrentUsed>() });
}
}
public Task LaunchAsync()
{
return Bus.StartAsync();
}
public Task StopAsync()
{
return Bus.StopAsync();
}
}
【问题讨论】:
-
您可以使用github.com/MassTransit/MassTransit-Benchmark 对您的环境进行基准测试 - 可能会给您一些想法。此外,如果您发布有助于向您提供反馈的配置。您不需要任何过滤器,只需要在接收端点上使用 MaxConcurrentCalls 和 PrefetchCount 来调整 ASB 的性能。
-
@ChrisPatterson,我如何运行基准测试?我已经启动了它——它创建了一个队列并向它推送了 10K 条消息。什么都没有:)有什么错误?
-
@ChrisPatterson 我已经用听众的代码更新了我的问题。敬请期待。请注意,我添加了 800 毫秒的延迟来模拟将要发生的异步依赖调用
-
所以,数学上,0.8 秒乘以 16 个并发大约是每秒 20 条消息。因此,除非您增加 MaxConcurrentCalls,否则您所看到的就是您将得到的。您也可以删除 UseConcurrencyLimit,它不是必需的。
-
@ChrisPatterson,原来我在消费者端缺少一个配置器。顺便说一句,我如何启动基准测试?我做不到
标签: message-queue azureservicebus masstransit