【发布时间】:2017-05-02 17:45:09
【问题描述】:
我正在用 C#/.NET(使用 EasyNetQ 库)在 RabbitMQ 中验证基于主题的路由概念。对于我的测试,我有一个交换器通过主题路由(“TopicA”和“TopicB”)绑定到两个持久队列。
这里是生产者的代码(一个 C# 控制台应用程序):
using (var bus = RabbitHutch.CreateBus("host=xxx;username=xxx;password=xxx"))
{
Random random = new Random();
Foo foo; // my test message class
for (int i = 0; i < 100; i++)
{
int coin = random.Next(0, 2);
if (coin == 0)
{
foo = new Foo() { Payload = "Heads" };
bus.Publish(foo, "TopicA");
Console.WriteLine($"Published message {i} to TopicA.");
}
else
{
foo = new Foo() { Payload = "Tails" };
bus.Publish(foo, "TopicB");
Console.WriteLine($"Published message {i} to TopicB.");
}
}
}
这是我的消费者代码(也是一个 C# 控制台应用程序):
class Program
{
static void Main(string[] args)
{
TestRabbitMQSubscribe();
Console.ReadKey(false);
}
private static void TestRabbitMQSubscribe()
{
using (var bus = RabbitHutch.CreateBus("host=xxx;username=xxx;password=xxx"))
{
bus.Subscribe<Foo>("TopicA", HandleFooA, config => config.WithTopic("TopicA"));
bus.Subscribe<Foo>("TopicB", HandleFooB, config => config.WithTopic("TopicB"));
}
}
private static void HandleFooA(Foo foo)
{
Console.WriteLine($"Received {foo.Payload} from TopicA.");
File.AppendAllText(@"c:\heads.txt", foo.Payload + Environment.NewLine);
}
private static void HandleFooB(Foo foo)
{
Console.WriteLine($"Received {foo.Payload} from TopicB.");
File.AppendAllText(@"c:\tails.txt", foo.Payload + Environment.NewLine);
}
}
生产者代码运行没有问题,然后我可以验证(使用 RabbitMQ 管理 UI)TopicA 队列包含(例如)47 条消息,而 TopicB 队列包含 53 条消息,如预期的那样。
然后我运行消费者代码,它似乎拉入了 TopicA 队列中的所有消息并将它们写入适当的文件。然而,它要么只接收到 TopicB 队列中的少量消息,要么根本不接收。然后它将在Console.ReadKey() 调用处停止。
如果我颠倒 bus.Subscribe() 调用的顺序,它将从 TopicB 中提取消息,但不会从 TopicA 中提取。
我觉得我一定是遗漏了一些简单的东西(例如阻塞调用),或者我从根本上误解了一些 RabbitMQ 或 EasyNetQ 概念。
【问题讨论】:
-
看起来,一旦消息全部产生,生产者
using块就被允许退出。这将使队列死亡。 -
这个运气好吗?
-
@MarkLarter,我还没有机会重温它,但感谢您的回答——这是我将调查的第一个途径。