【问题标题】:MassTransit not subscribing to AzureServiceBus TopicMassTransit 未订阅 AzureServiceBus 主题
【发布时间】:2023-04-08 12:20:02
【问题描述】:

我目前正在尝试使用 MassTransit 6.3.2 更新最初是 .NET Core 3.1 的应用程序。它现在配置为使用 .NET 6.0 和 MassTransit 7.3.0

我们的应用程序使用 MassTransit 通过 Azure 服务总线发送消息,将消息发布到主题,然后让其他订阅者监听这些主题。

砍掉,它是这样实现的:

// Program.cs
services.AddMassTransit(config =>
{
   config.AddConsumer<AppointmentBookedMessageConsumer>();
   config.AddBus(BusControlFactory.ConfigureAzureServiceBus);
});


// BusControlFactory.cs
public static class BusControlFactory
{
   public static IBusControl ConfigureAzureServiceBus(IRegistrationContext<IServiceProvider> context)
   {
      var config = context.Container.GetService<AppConfiguration>();
      var azureServiceBus = Bus.Factory.CreateUsingAzureServiceBus(busFactoryConfig =>
      {
         busFactoryConfig.Host("Endpoint=sb://REDACTED-queues.servicebus.windows.net/;SharedAccessKeyName=MyMessageQueuing;SharedAccessKey=MyKeyGoesHere");
         busFactoryConfig.Message<AppointmentBookedMessage>(m => m.SetEntityName("appointment-booked"));
         busFactoryConfig.SubscriptionEndpoint<AppointmentBookedMessage>(
            "my-subscriber-name",
            configurator =>
            {
               configurator.UseMessageRetry(r => r.Interval(5, TimeSpan.FromSeconds(60)));
               configurator.Consumer<AppointmentBookedMessageConsumer>(context.Container);
            });
         return azureServiceBus;
      }
   }
}

它现在已更改并升级到最新的 MassTransit,其实现方式如下:

// Program.cs
services.AddMassTransit(config =>
{
   config.AddConsumer<AppointmentBookedMessageConsumer, AppointmentBookedMessageConsumerDefinition>();
   config.UsingAzureServiceBus((context, cfg) => 
   {
      cfg.Host("Endpoint=sb://REDACTED-queues.servicebus.windows.net/;SharedAccessKeyName=MyMessageQueuing;SharedAccessKey=MyKeyGoesHere");
      cfg.Message<AppointmentBookedMessage>(m => m.SetEntityName("appointment-booked"));

      cfg.ConfigureEndpoints(context);
});


// AppointmentBookedMessageConsumerDefinition.cs
public class AppointmentBookedMessageConsumerDefinition: ConsumerDefinition<AppointmentBookedMessageConsumer>
{
   public AppointmentBookedMessageConsumerDefinition()
   {
      EndpointName = "testharness.subscriber";
   }

   protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<AppointmentBookedMessageConsumer> consumerConfigurator)
   {
      endpointConfigurator.UseMessageRetry(r => r.Interval(5, TimeSpan.FromSeconds(60)));
   }
}

如果可以考虑的话,问题是我无法绑定到已经存在的订阅。

在上面的示例中,您可以看到EndpointName 设置为“testharness.subscriber”。在我升级之前已经订阅了“预约”主题。但是,当应用程序运行时,它不会出错,但不会收到任何消息。

如果我将EndpointName 更改为“testharness.subscriber2”。另一个订阅者出现在 Azure 服务总线主题中(通过 Azure 门户),我开始接收消息。我看不出名称有什么不同(除了我放置的更改,在这种情况下:“2”后缀)。

我在这里遗漏了什么吗?我还需要做些什么来让这些绑定吗?我的配置错了吗?是不是错了?虽然我确信我可以通过更密切地管理版本并在他们使用新的队列时删除不需要的队列来解决这个问题 - 但感觉就像是错误的方法。

【问题讨论】:

    标签: c# azure azureservicebus masstransit


    【解决方案1】:

    使用 Azure 服务总线,订阅上的ForwardTo 可能有点不透明。

    虽然订阅可能确实在视觉上表明它正在转发到正确的命名队列,但可能队列在某个时间点被删除并重新创建而没有删除订阅。这会导致订阅将建立消息,因为它无法将它们转发到不再存在的队列。

    为什么?在内部,订阅将 ForwardTo 维护为 object id,在队列被删除后,它指向一个不存在的对象 - 导致订阅中累积消息。

    如果订阅中有消息,您可能需要进入门户并更新该订阅以指向新队列(即使它具有相同的名称),此时消息应该流向队列。

    如果订阅中没有任何消息(或者如果它们不重要),您可以删除订阅,当您重新启动巴士时,MassTransit 会重新创建它。

    【讨论】:

    • 这是有道理的。所以这并不是说有什么特别做错了——只是 MassTransit 无法绑定到它以前绑定的队列。因此,重点是确保我们不会丢失消息等。感谢您的回复!
    • 是的,更多的是它“认为”它已正确绑定到队列(订阅显示正确的名称),但底层对象引用的队列不再存在。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-09-18
    • 2017-08-11
    • 2016-09-12
    • 2020-09-20
    • 2018-06-17
    • 2016-06-10
    • 2014-09-12
    相关资源
    最近更新 更多