【问题标题】:RabbitMQ: using Routing to achieve message selectionRabbitMQ:使用路由实现消息选择
【发布时间】:2016-09-16 09:06:13
【问题描述】:

我正在尝试使用 RabbitMQ 实现一个解决方案,以实现分布式 RPC 之类的东西,只使用一个请求和一个响应队列来处理大量处理器,我已经使用 Apache Apollo 实现了这样一个解决方案,我会喜欢的能够将其迁移到 RabbitMQ。以下是重点:

  • 每台服务器都连接到请求队列
  • 每个服务器只处理应该为他提供的请求(标​​头字段)

在我对 Apollo 的实现中,关键点是选择器的使用(例如标头字段值的 where 子句),我认为这是在 RabbitMQ 中通过路由和路由键实现的,但我一定是错的,因为我看到工人接收不应该发给他们的消息。

为了重现问题,我修改了路由示例 (http://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html),我有两个消费者,我可以从定义 routingKey 的不同参数开始,以及一个为其中一个消费者生成消息的生产者。我看到的行为是消息的消费似乎是随机的(“John”的消息第一次由“John”的消费者处理,第二次由“Mary”的消费者处理)

有人对在 RabbitMQ 中使用选择器有任何指示或代码 sn-ps 吗?

在我的消费者代码下方:

public static void Main( String[] args )
{
    var factory = new ConnectionFactory { HostName = "localhost" };
    using ( var connection = factory.CreateConnection() )
        using ( var channel = connection.CreateModel() )
        {
            const String request = "request";
            channel.ExchangeDeclare( request, "direct" );

            channel.QueueDeclare( request, true, false, false, null );

            if ( args.Length < 1 )
            {
                Console.WriteLine( " Press [enter] to exit." );
                Console.ReadLine();
                Environment.ExitCode = 1;
                return;
            }

            var myRoutingKey = args[0];
            channel.QueueBind( request, request, myRoutingKey );

            Console.WriteLine( $" [*] Waiting for messages for {myRoutingKey}." );

            var consumer = new EventingBasicConsumer( channel );
            consumer.Received += ( model, ea ) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString( body );
                var routingKey = ea.RoutingKey;
                Console.WriteLine( $" [x] Received '{routingKey}':'{message}'" );
            };
            channel.BasicConsume( request, true, consumer );

            Console.WriteLine( " Press [enter] to exit." );
            Console.ReadLine();
        }
}

对于生产者:

public static void Main( String[] args )
{
    var factory = new ConnectionFactory { HostName = "localhost" };
    using ( var connection = factory.CreateConnection() )
        using ( var channel = connection.CreateModel() )
        {
            const String request = "request";
            channel.ExchangeDeclare( request, "direct" );

            channel.QueueDeclare( request, true, false, false, null );

            var routingKey = args.Length > 0 ? args[0] : "John";

            const String message = "Hi";
            var body = Encoding.UTF8.GetBytes( message );
            channel.BasicPublish( request, routingKey, null, body );
            Console.WriteLine( $" [x] Sent '{routingKey}':'{message}'" );
        }

    Console.WriteLine( " Press [enter] to exit." );
    Console.ReadLine();
}

提前致谢。

【问题讨论】:

    标签: c# queue rabbitmq


    【解决方案1】:

    我可以猜到为什么这对你不起作用。关键是您的消费者中的这两行。

    channel.QueueDeclare( request, true, false, false, null );
    channel.QueueBind( request, request, myRoutingKey );
    

    事实上,“请求”是“所有”消费者的队列名称。如果您使用多个路由键运行此程序设置多个绑定,最终结果是名为“请求”的队列使用多个路由键(例如“John”、“Mary”)绑定到您的交换。请记住,当您执行此绑定时,绑定在 RabbitMQ 服务器中不是瞬态的,它们会一直存在。

    现在回到如何解决您的问题。有多种选择,但这里是其中之一。首先我推荐阅读RabbitMQ Model

    您可以使用包含这些行的相同教程代码来代替您的:

     var queueName = channel.QueueDeclare().QueueName;
     channel.QueueBind( queueName, request, myRoutingKey );
    

    但是上面的意思是每次你运行你的消费者程序时,都会创建一个新队列,并使用你想要的路由键绑定到交换器。另一种方法是使用您之前使用的相同代码,但只需适当地选择您的队列名称而不是固定的队列名称。例如,每个路由键可以有一个队列

    var queueName = myRoutingKey ;
    channel.QueueDeclare( queueName, true, false, false, null );
    channel.QueueBind( queueName, request, myRoutingKey );
    

    或者,您可以将多个路由键分组到一个队列中,类似于教程示例。

    关键是你需要做的事情不能用一个队列来完成。 (除了在使用消息时过滤掉消息)。但这听起来对你来说不是一个真正的要求。您问的是每个消费者服务器只处理您可以在此模型中执行的相关消息。生产者只发布到那个交易所(这是你想要的)。

    【讨论】:

    • 您好阿明,感谢您的回答。我还发现使用动态生成的队列不会出现问题。无论如何,我一直在寻找一个只有一个请求和一个响应队列的解决方案,这既是出于性能问题,也是因为我之前与另一个经纪人一起实施的另一个解决方案就是这种解决方案。识别出不同的模型后,后者不是问题,只需要重新设计,但我肯定需要验证性能。
    • 此外,通过使用动态生成的队列,我丢失了来自生产者的所有消息,这些消息在路由消费者未运行时生成。我也尝试将交换声明为持久的,但它没有帮助。
    • 动态生成的队列对您来说基本上是临时队列,因为您将丢失我假设的队列名称?我不相信动态队列名称将成为您只需使用一致名称的解决方案(将它们视为您的选择器)。这样做不会影响性能。这就是 RabbitMQ 的设计目的。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-08-10
    • 2016-11-24
    • 1970-01-01
    • 2012-07-25
    • 1970-01-01
    相关资源
    最近更新 更多