【发布时间】:2016-09-16 09:06:13
【问题描述】:
我正在尝试使用 RabbitMQ 实现一个解决方案,以实现分布式 RPC 之类的东西,只使用一个请求和一个响应队列来处理大量处理器,我已经使用 Apache Apollo 实现了这样一个解决方案,我会喜欢的能够将其迁移到 RabbitMQ。以下是重点:
- 每台服务器都连接到请求队列
- 每个服务器只处理应该为他提供的请求(标头字段)
在我对 Apollo 的实现中,关键点是选择器的使用(例如标头字段值的 where 子句),我认为这是在 RabbitMQ 中通过路由和路由键实现的,但我一定是错的,因为我看到工人接收不应该发给他们的消息。
为了重现问题,我修改了路由示例 (http://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html),我有两个消费者,我可以从定义 routingKey 的不同参数开始,以及一个为其中一个消费者生成消息的生产者。我看到的行为是消息的消费似乎是随机的(“John”的消息第一次由“John”的消费者处理,第二次由“Mary”的消费者处理)
有人对在 RabbitMQ 中使用选择器有任何指示或代码 sn-ps 吗?
在我的消费者代码下方:
public static void Main( String[] args )
{
var factory = new ConnectionFactory { HostName = "localhost" };
using ( var connection = factory.CreateConnection() )
using ( var channel = connection.CreateModel() )
{
const String request = "request";
channel.ExchangeDeclare( request, "direct" );
channel.QueueDeclare( request, true, false, false, null );
if ( args.Length < 1 )
{
Console.WriteLine( " Press [enter] to exit." );
Console.ReadLine();
Environment.ExitCode = 1;
return;
}
var myRoutingKey = args[0];
channel.QueueBind( request, request, myRoutingKey );
Console.WriteLine( $" [*] Waiting for messages for {myRoutingKey}." );
var consumer = new EventingBasicConsumer( channel );
consumer.Received += ( model, ea ) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString( body );
var routingKey = ea.RoutingKey;
Console.WriteLine( $" [x] Received '{routingKey}':'{message}'" );
};
channel.BasicConsume( request, true, consumer );
Console.WriteLine( " Press [enter] to exit." );
Console.ReadLine();
}
}
对于生产者:
public static void Main( String[] args )
{
var factory = new ConnectionFactory { HostName = "localhost" };
using ( var connection = factory.CreateConnection() )
using ( var channel = connection.CreateModel() )
{
const String request = "request";
channel.ExchangeDeclare( request, "direct" );
channel.QueueDeclare( request, true, false, false, null );
var routingKey = args.Length > 0 ? args[0] : "John";
const String message = "Hi";
var body = Encoding.UTF8.GetBytes( message );
channel.BasicPublish( request, routingKey, null, body );
Console.WriteLine( $" [x] Sent '{routingKey}':'{message}'" );
}
Console.WriteLine( " Press [enter] to exit." );
Console.ReadLine();
}
提前致谢。
【问题讨论】: