一. Direct-Exchange模式

1. 含义

 交换机类型设置为:ExchangeType.Direct

 交换机和队列通过routingKey(路由key)进行绑定,发消息的时候每条消息也要指定routingKey(路由key),然后交换机根据该路由key进行匹配,该key绑定了几个Queue那么该条消息就同时发送到几个队列中

2. 使用场景

 通过消息队列来写日志;

 Info debug error warn :记录下来

 error: 除了记录下来,还需要特殊处理,可能需要发送一个信息,发送一个邮件;

解决方案:通过路由key匹配不同的队列

 队列1:专门用来记录日志

 队列2:专门用来发邮件,发信息

3. 代码分享

生产者

 /// <summary>
    /// DirectExchange路由
    /// </summary>
    public class DirectExchange
    {
        public static void Show()
        {
            Console.ForegroundColor = ConsoleColor.Red;

            var factory = new ConnectionFactory();
            factory.HostName = "localhost";//RabbitMQ服务在本地运行
            factory.UserName = "guest";//用户名
            factory.Password = "guest";//密码 
            using (var connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    //声明两个队列
                    channel.QueueDeclare(queue: "DirectExchangeLogAllQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); 
                    channel.QueueDeclare(queue: "DirectExchangeErrorQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    //1个路由
                    channel.ExchangeDeclare(exchange: "DirectExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                    //4种路由key统一绑定DirectExchangeLogAllQueue队列,
                    string[] logtypes = new string[] { "debug", "info", "warn", "error" };
                    foreach (string logtype in logtypes)
                    {
                        channel.QueueBind(queue: "DirectExchangeLogAllQueue",
                                exchange: "DirectExChange",
                                routingKey: logtype);
                    }
                    //路由key“error”再次绑定DirectExchangeErrorQueue队列
                    channel.QueueBind(queue: "DirectExchangeErrorQueue",
                              exchange: "DirectExChange",
                              routingKey: "error");
                    List<LogMsgModel> logList = new List<LogMsgModel>();
                    for (int i = 1; i <=20; i++)
                    {
                        if (i % 4 == 0)
                        {
                            logList.Add(new LogMsgModel() { LogType = "info", Msg = Encoding.UTF8.GetBytes($"info第{i}条信息") });
                        }
                        if (i % 4 == 1)
                        {
                            logList.Add(new LogMsgModel() { LogType = "debug", Msg = Encoding.UTF8.GetBytes($"debug第{i}条信息") });
                        }
                        if (i % 4 == 2)
                        {
                            logList.Add(new LogMsgModel() { LogType = "warn", Msg = Encoding.UTF8.GetBytes($"warn第{i}条信息") });
                        }
                        if (i % 4 == 3)
                        {
                            logList.Add(new LogMsgModel() { LogType = "error", Msg = Encoding.UTF8.GetBytes($"error第{i}条信息") });
                        }
                    }
                     
                    Console.WriteLine("生产者发送20条日志信息");
                    //发送日志信息
                    foreach (var log in logList)
                    {
                        channel.BasicPublish(exchange: "DirectExChange",
                                            routingKey: log.LogType,
                                            basicProperties: null,
                                            body: log.Msg);
                        Console.WriteLine($"{Encoding.UTF8.GetString(log.Msg)}  已发送~~");
                    }
                     
                }
            }
        }


        public class LogMsgModel
        {
            public string LogType { get; set; }
            public byte[] Msg { get; set; }
        }
    }
View Code

相关文章: