【问题标题】:Subscribing to multiple RabbitMQ queues using EasyNetQ使用 EasyNetQ 订阅多个 RabbitMQ 队列
【发布时间】:2017-05-02 17:45:09
【问题描述】:

我正在用 C#/.NET(使用 EasyNetQ 库)在 RabbitMQ 中验证基于主题的路由概念。对于我的测试,我有一个交换器通过主题路由(“TopicA”和“TopicB”)绑定到两个持久队列。

这里是生产者的代码(一个 C# 控制台应用程序):

using (var bus = RabbitHutch.CreateBus("host=xxx;username=xxx;password=xxx"))
{
    Random random = new Random();
    Foo foo; // my test message class

    for (int i = 0; i < 100; i++)
    {
        int coin = random.Next(0, 2);

        if (coin == 0)
        {
            foo = new Foo() { Payload = "Heads" };
            bus.Publish(foo, "TopicA");
            Console.WriteLine($"Published message {i} to TopicA.");
        }
        else
        {
            foo = new Foo() { Payload = "Tails" };
            bus.Publish(foo, "TopicB");
            Console.WriteLine($"Published message {i} to TopicB.");
        }
    }
}

这是我的消费者代码(也是一个 C# 控制台应用程序):

class Program
{
    static void Main(string[] args)
    {
        TestRabbitMQSubscribe();
        Console.ReadKey(false);
    }

    private static void TestRabbitMQSubscribe()
    {
        using (var bus = RabbitHutch.CreateBus("host=xxx;username=xxx;password=xxx"))
        {
            bus.Subscribe<Foo>("TopicA", HandleFooA, config => config.WithTopic("TopicA"));
            bus.Subscribe<Foo>("TopicB", HandleFooB, config => config.WithTopic("TopicB"));
        }
    }

    private static void HandleFooA(Foo foo)
    {
        Console.WriteLine($"Received {foo.Payload} from TopicA.");
        File.AppendAllText(@"c:\heads.txt", foo.Payload + Environment.NewLine);
    }

    private static void HandleFooB(Foo foo)
    {
        Console.WriteLine($"Received {foo.Payload} from TopicB.");
        File.AppendAllText(@"c:\tails.txt", foo.Payload + Environment.NewLine);
    }
}

生产者代码运行没有问题,然后我可以验证(使用 RabbitMQ 管理 UI)TopicA 队列包含(例如)47 条消息,而 TopicB 队列包含 53 条消息,如预期的那样。

然后我运行消费者代码,它似乎拉入了 TopicA 队列中的所有消息并将它们写入适当的文件。然而,它要么只接收到 TopicB 队列中的少量消息,要么根本不接收。然后它将在Console.ReadKey() 调用处停止。

如果我颠倒 bus.Subscribe() 调用的顺序,它将从 TopicB 中提取消息,但不会从 TopicA 中提取。

我觉得我一定是遗漏了一些简单的东西(例如阻塞调用),或者我从根本上误解了一些 RabbitMQ 或 EasyNetQ 概念。

【问题讨论】:

  • 看起来,一旦消息全部产生,生产者using 块就被允许退出。这将使队列死亡。
  • 这个运气好吗?
  • @MarkLarter,我还没有机会重温它,但感谢您的回答——这是我将调查的第一个途径。

标签: c# .net rabbitmq easynetq


【解决方案1】:

这似乎只需要对您的代码进行一些调整,主要是围绕放置 ReadLine() 以防止 Producer 退出的位置。在实际实现中,会有一些其他机制保持生产者处于活动状态,从而确保队列的持久性。

我认为主要思想是在消费者订阅从队列中读取所有内容之前不应终止生产者发布连接。

您可以在解决方案中将 Producer 和 Consumer 都设置为启动项目并同时运行它们,或者可以运行 Producer(但不要按 Enter),然后稍后可以运行 Consumer。只要 Producer using 块没有超出范围,一切都是好的和持久的。

完整发布代码只是为了确保:

制片人:

using System;
using EasyNetQ;
using Messages;

namespace Producer
{
    class Program
    {
        static void Main(string[] args)
        {
            using (var bus = RabbitHutch.CreateBus("host=zzz;username=zzz;password=zzz"))
            {
                var random = new Random();
                for (var i = 1; i <= 100; ++i)
                {
                    var coin = random.Next(0, 2);
                    if (coin == 0)
                    {
                        bus.Publish(new CoinFlipMessage { Payload = "Heads" }, CoinFlipMessage.HeadsTopic);
                        Console.WriteLine($"Published message {i} to {CoinFlipMessage.HeadsTopic}");
                    }
                    else
                    {
                        bus.Publish(new CoinFlipMessage { Payload = "Tails" }, CoinFlipMessage.TailsTopic);
                        Console.WriteLine($"Published message {i} to {CoinFlipMessage.TailsTopic}.");
                    }
                }
                Console.ReadLine();
            }
        }
    }
}

消费者:

using System;
using System.IO;
using EasyNetQ;
using Messages;

namespace Consumer
{
    class Program
    {
        static void Main(string[] args)
        {
            using (var bus = RabbitHutch.CreateBus("host=zzz;username=zzz;password=zzz"))
            {

                bus.Subscribe<CoinFlipMessage>(CoinFlipMessage.HeadsTopic, HandleHeads, config => config.WithTopic(CoinFlipMessage.HeadsTopic));
                bus.Subscribe<CoinFlipMessage>(CoinFlipMessage.TailsTopic, HandleTails, config => config.WithTopic(CoinFlipMessage.TailsTopic));
                Console.ReadLine();
            }
        }

        private static void HandleHeads(CoinFlipMessage message)
        {
            if (message == null) return;
            headsCount++;
            var payload = message.Payload;
            Console.WriteLine($"Received {payload} {headsCount} from {CoinFlipMessage.HeadsTopic}.");
            File.AppendAllText(@"heads.txt", payload + Environment.NewLine);
        }

        private static void HandleTails(CoinFlipMessage message)
        {
            if (message == null) return;
            tailsCount++;
            var payload = message.Payload;
            Console.WriteLine($"Received {payload} {tailsCount} from {CoinFlipMessage.TailsTopic}.");
            File.AppendAllText(@"tails.txt", payload + Environment.NewLine);
        }

        private static int headsCount;
        private static int tailsCount;
    }
}

消息:

使用系统;

namespace Messages
{
    public class CoinFlipMessage
    {
        public string Payload { get; set; }

        public static string HeadsTopic = "TopicHeads";
        public static string TailsTopic = "TopicTails";
    }
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2012-11-29
    • 1970-01-01
    • 2013-07-05
    • 2016-09-18
    • 1970-01-01
    • 1970-01-01
    • 2011-12-18
    相关资源
    最近更新 更多