【问题标题】:MassTransit - Can Multiple Consumers All Receive Same Message?MassTransit - 多个消费者都可以收到相同的消息吗?
【发布时间】:2019-07-25 20:36:36
【问题描述】:

我有一个 .NET 4.5.2 Service Publishing 消息通过 MassTransit 到 RabbitMq。

以及消耗这些消息的 .NET Core 2.1 服务的多个实例。

目前 .NET 核心消费者服务的竞争实例从其他实例窃取消息。

即第一个消费消息的人将其从队列中取出,其余的服务实例不会消费它。

我希望 ALL 个实例使用相同的消息。

我怎样才能做到这一点?

Publisher Service 配置如下:

 builder.Register(context =>
            {
                MessageCorrelation.UseCorrelationId<MyWrapper>(x => x.CorrelationId);

                return Bus.Factory.CreateUsingRabbitMq(configurator =>
                {
                    configurator.Host(new Uri("rabbitmq://localhost:5671"), host =>
                    {
                        host.Username(***);
                        host.Password(***);
                    });
                    configurator.Message<MyWrapper>(x => { x.SetEntityName("my.exchange"); });
                    configurator.Publish<MyWrapper>(x =>
                    {
                        x.AutoDelete = true;
                        x.Durable = true;
                        x.ExchangeType = true;
                    });

                });
            })
            .As<IBusControl>()
            .As<IBus>()
            .SingleInstance();

而 .NET Core Consumer Services 的配置如下:

        serviceCollection.AddScoped<MyWrapperConsumer>();

        serviceCollection.AddMassTransit(serviceConfigurator =>
        {
            serviceConfigurator.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                var host = cfg.Host(new Uri("rabbitmq://localhost:5671"), hostConfigurator =>
                {
                    hostConfigurator.Username(***);
                    hostConfigurator.Password(***);

                });
                cfg.ReceiveEndpoint(host, "my.exchange", exchangeConfigurator =>
                {
                    exchangeConfigurator.AutoDelete = true;
                    exchangeConfigurator.Durable = true;
                    exchangeConfigurator.ExchangeType = "topic";
                    exchangeConfigurator.Consumer<MyWrapperConsumer>(provider);
                });
            }));
        });
        serviceCollection.AddSingleton<IHostedService, BusService>();

然后 MyWrapperConsumer 看起来像这样:

public class MyWrapperConsumer :
    IConsumer<MyWrapper>
{
    .
    .

    public MyWrapperConsumer(...) => (..) = (..);

    public async Task Consume(ConsumeContext<MyWrapper> context)
    {
        //Do Stuff 
    }
}

【问题讨论】:

    标签: c# .net .net-core rabbitmq masstransit


    【解决方案1】:

    听起来您想要发布消息并让多个消费者服务实例接收它们。在这种情况下,每个服务实例都需要有自己的队列。这样,每条发布的消息都会导致将副本传递到每个队列。然后,每个接收端点将从自己的队列中读取该消息并使用它。

    您所做的所有过度配置都与您想要的背道而驰。要使其工作,请删除所有交换类型配置,只需为每个服务实例配置一个唯一的队列名称(您可以从主机、机器等生成它),然后在消息生产者上调用 Publish。

    可以看到RabbitMQ拓扑是如何配置的:https://masstransit-project.com/advanced/topology/rabbitmq.html

    【讨论】:

    • 感谢您的回复。我很难得到这个,这是我的错。新手。通过将我的消费服务接收端点更改为以下内容,我已经尝试了我认为您所说的内容。但没有运气。如果您能回答我应该进行的配置更改的代码示例,我将不胜感激?无法弄清楚如何将每个队列绑定到发布者交换。谢谢 cfg.ReceiveEndpoint(host, "my.exchange.123", exchangeConfigurator => { exchangeConfigurator.Consumer(provider); });
    • 暂时忘记交换,每个具有不同代码来消费该消息的服务应该有一个不同的端点名称,它将获得自己的消息。具有相同名称的端点将成为一个队列中的竞争消费者。所有这些模式都在企业集成模式热潮中进行了描述,具体案例在 MT 文档的 Common Gotchas 部分中有所提及。
    • 谢谢阿列克谢。我想我需要在这里发布一个具体的代码示例。我已经阅读了常见的陷阱,但无法将其转换为代码。当我将消费者服务的配置更改为我在之前评论中发布的内容时,消费者停止消费。
    • 绅士们,我赞成你们的答案和 cmets。并将我自己的答案标记为答案,因为它包含特定代码。 (有任何问题让我知道)谢谢你的建议,花了我一段时间,但我在你的帮助下到达了那里。
    • @chris-patterson 我想我误解了发布和发送之间的区别,虽然我认为我明白了:) ...我认为发布是将消息发送给您的所有消费者发送只会将其发送给其中一个。但是,那有什么区别呢?
    【解决方案2】:

    感谢 Chris Patterson 的回答和 Alexey Zimarev 的评论,我现在相信我已经完成了这项工作。

    这些家伙指出(根据我的理解,如果我错了,请纠正我)我应该自己摆脱指定 Exchange 和 Queues 等,并且不再对我的配置进行如此精细的配置。

    让 MassTransit 根据我的类型 MyWrapper 了解要创建和发布到哪个交换,以及要创建和绑定到该交换的队列。还有我的IConsumerimplementation 类型MyWrapperConsumer

    然后为每个消费者服务提供其自己唯一的 ReceiveEndpoint 名称,我们最终将交换扇出 MyWrapper 类型的消息到每个由指定的唯一名称创建的唯一队列。

    所以,就我而言..

    发布者服务配置相关代码行更改自:

        configurator.Message<MyWrapper>(x => { x.SetEntityName("my.exchange"); });
                configurator.Publish<MyWrapper>(x =>
                {
                    x.AutoDelete = true;
                    x.Durable = true;
                    x.ExchangeType = true;
                });
    

    到这里

           configurator.Message<MyWrapper>(x => { });
           configurator.AutoDelete = true;
    

    并且每个消费者服务实例配置相关的代码行更改自:

            cfg.ReceiveEndpoint(host, "my.exchange", exchangeConfigurator =>
                    {
                        exchangeConfigurator.AutoDelete = true;
                        exchangeConfigurator.Durable = true;
                        exchangeConfigurator.ExchangeType = "topic";
                        exchangeConfigurator.Consumer<MyWrapperConsumer>(provider);
                    });
    

    对此:

            cfg.ReceiveEndpoint(host, Environment.MachineName, queueConfigurator =>
                    {
                        queueConfigurator.AutoDelete = true;
                        queueConfigurator.Consumer<MyWrapperConsumer>(provider);
                    });
    

    注意,Environment.MachineName 为每个实例提供了唯一的队列名称

    【讨论】:

      【解决方案3】:

      我们可以通过为每个消费者服务设置单独的队列并且每个队列与单个交换绑定来实现它。当我们发布消息进行交换时,它会将消息副本发送到每个队列并最终被每个消费者服务接收。

      消息:

          namespace Masstransit.Message
      {
          public interface ICustomerRegistered
          {
              Guid Id { get; }
              DateTime RegisteredUtc { get; }
              string Name { get; }
              string Address { get; }
          }
      }
      
      namespace Masstransit.Message
      {
          public interface IRegisterCustomer
          {
              Guid Id { get; }
              DateTime RegisteredUtc { get; }
              string Name { get; }
              string Address { get; }
          }
      }
      

      发布者控制台应用程序:

      namespace Masstransit.Publisher
      {
          class Program
          {
              static void Main(string[] args)
              {
                  Console.WriteLine("CUSTOMER REGISTRATION COMMAND PUBLISHER");
                  Console.Title = "Publisher window";
                  RunMassTransitPublisher();
              }
      
              private static void RunMassTransitPublisher()
              {
                  string rabbitMqAddress = "rabbitmq://localhost:5672";
                  string rabbitMqQueue = "mycompany.domains.queues";
                  Uri rabbitMqRootUri = new Uri(rabbitMqAddress);
      
                  IBusControl rabbitBusControl = Bus.Factory.CreateUsingRabbitMq(rabbit =>
                  {
                      rabbit.Host(rabbitMqRootUri, settings =>
                      {
                          settings.Password("guest");
                          settings.Username("guest");
                      });
                  });
      
                  Task<ISendEndpoint> sendEndpointTask = rabbitBusControl.GetSendEndpoint(new Uri(string.Concat(rabbitMqAddress, "/", rabbitMqQueue)));
                  ISendEndpoint sendEndpoint = sendEndpointTask.Result;
      
                  Task sendTask = sendEndpoint.Send<IRegisterCustomer>(new
                  {
                      Address = "New Street",
                      Id = Guid.NewGuid(),                
                      RegisteredUtc = DateTime.UtcNow,
                      Name = "Nice people LTD"                            
                  }, c =>
                  {
                      c.FaultAddress = new Uri("rabbitmq://localhost:5672/accounting/mycompany.queues.errors.newcustomers");
                  });
      
                  Console.ReadKey();
              }
          }
      }
      

      接收器管理控制台应用程序:

      namespace Masstransit.Receiver.Management
      {
          class Program
          {
              static void Main(string[] args)
              {
                  Console.Title = "Management consumer";
                  Console.WriteLine("MANAGEMENT");
                  RunMassTransitReceiver();
              }
      
              private static void RunMassTransitReceiver()
              {
                  IBusControl rabbitBusControl = Bus.Factory.CreateUsingRabbitMq(rabbit =>
                  {
                      rabbit.Host(new Uri("rabbitmq://localhost:5672"), settings =>
                      {
                          settings.Password("guest");
                          settings.Username("guest");
                      });
      
                      rabbit.ReceiveEndpoint("mycompany.domains.queues.events.mgmt", conf =>
                      {
                          conf.Consumer<CustomerRegisteredConsumerMgmt>();
                      });
                  });
                  rabbitBusControl.Start();
                  Console.ReadKey();
                  rabbitBusControl.Stop();
              }
          }
      }
      

      Receiver Sales Console 应用程序:

      namespace Masstransit.Receiver.Sales
      {
          class Program
          {
              static void Main(string[] args)
              {
                  Console.Title = "Sales consumer";
                  Console.WriteLine("SALES");
                  RunMassTransitReceiver();
              }
      
              private static void RunMassTransitReceiver()
              {
                  IBusControl rabbitBusControl = Bus.Factory.CreateUsingRabbitMq(rabbit =>
                  {
                      rabbit.Host(new Uri("rabbitmq://localhost:5672"), settings =>
                      {
                          settings.Password("guest");
                          settings.Username("guest");
                      });
      
                      rabbit.ReceiveEndpoint("mycompany.domains.queues.events.sales", conf =>
                      {
                          conf.Consumer<CustomerRegisteredConsumerSls>();
                      });
                  });
      
                  rabbitBusControl.Start();
                  Console.ReadKey();
      
                  rabbitBusControl.Stop();
              }
          }
      }
      

      您可以在 https://github.com/prasantj409/Masstransit-PublishMultipleConsumer.git 上找到可行的解决方案

      【讨论】:

      • 虽然此链接可能会回答问题,但最好在此处包含答案的基本部分并提供链接以供参考。如果链接页面发生更改,仅链接答案可能会失效。 - From Review
      • @Labu:这显然不仅仅是只是一个链接。如果您删除链接,它仍然提供解决方案。也就是说,作者在链接到自己的外部内容时应注意阅读Stack Overflow's rules on self-promotion
      • 当然 - 问题是这是我必须为审核任务选择的唯一真正选项。唯一其他相关的是“没有评论”——这似乎无济于事。
      • @Labu:原因只是为了帮助提供共同的反馈。如果没有罐装 cmets 适合,您应该添加自己的评论。也就是说,在这种情况下,更合适的选择可能是将答案重新标记为垃圾邮件,因为它链接到作者自己的存储库而没有透露他们的隶属关系。
      【解决方案4】:

      默认情况下,RabbitMQ 将每条消息按顺序发送给所有消费者。这种类型的调度称为“循环”,用于负载平衡(您可以让服务的多个实例使用相同的消息)。 正如 Chris 指出的那样,为确保您的服务始终接收消息副本,您需要提供唯一的队列名称。

      【讨论】:

        【解决方案5】:

        我想分享一个稍微不同的代码示例。 实例ID:

        指定唯一标识端点的标识符 实例,附加到端点名称的末尾。

          services.AddMassTransit(x => {
            x.SetKebabCaseEndpointNameFormatter();
            Guid instanceId = Guid.NewGuid();
            x.AddConsumer<MyConsumer>()
              .Endpoint(c => c.InstanceId = instanceId.ToString());
        
            x.UsingRabbitMq((context, cfg) => {
              ...
              cfg.ConfigureEndpoints(context);
            });
          });
        

        【讨论】:

          【解决方案6】:

          你需要做什么:

          1. 确保您的消费者使用相同的泛型类型实现IConsumer 接口
          2. 注册所有这些消费者
          3. 使用Publish方法发送消息

          MassTransit 中通常有两种类型的消息:事件和命令,在这种情况下,您的消息是事件。如果你的消息是命令,只有一个消费者接收消息,你需要使用Send方法。

          事件 DTO 示例:

          public class OrderChecked
          {
              public Guid OrderId { get; set; }
          }
          

          消费者:

          public class OrderSuccessfullyCheckedConsumer : IConsumer<OrderChecked>
          {
              public async Task Consume(ConsumeContext<OrderChecked> context)
              {
                  // some your consuming code
              }
          }
          
          public class OrderSuccessfullyCheckedConsumer2 : IConsumer<OrderChecked>
          {
              public async Task Consume(ConsumeContext<OrderChecked> context)
              {
                  // some your second consuming code
              }
          }
          

          配置:

          services.AddMassTransit(c =>
          {
              c.AddConsumer<OrderSuccessfullyCheckedConsumer>();
              c.AddConsumer<OrderSuccessfullyCheckedConsumer2>();
                      
              c.SetKebabCaseEndpointNameFormatter();
              c.UsingRabbitMq((context, cfg) =>
              {
                  cfg.ConfigureEndpoints(context);
              });
          });
          services.AddMassTransitHostedService(true);
          

          发布消息:

          var endpoint = await _bus.GetPublishSendEndpoint<OrderChecked>();
          await endpoint.Send(new OrderChecked
          {
              OrderId = newOrder.Id
          });
          

          【讨论】:

            猜你喜欢
            • 1970-01-01
            • 1970-01-01
            • 2017-02-05
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 2016-06-04
            • 2018-07-02
            相关资源
            最近更新 更多