一. Direct-Exchange模式
1. 含义
交换机类型设置为:ExchangeType.Direct
交换机和队列通过routingKey(路由key)进行绑定,发消息的时候每条消息也要指定routingKey(路由key),然后交换机根据该路由key进行匹配,该key绑定了几个Queue,那么该条消息就同时发送到几个队列中。
2. 使用场景
通过消息队列来写日志;
Info debug error warn :记录下来
error: 除了记录下来,还需要特殊处理,可能需要发送一个信息,发送一个邮件;
解决方案:通过路由key匹配不同的队列
队列1:专门用来记录日志
队列2:专门用来发邮件,发信息
3. 代码分享
生产者
/// <summary> /// DirectExchange路由 /// </summary> public class DirectExchange { public static void Show() { Console.ForegroundColor = ConsoleColor.Red; var factory = new ConnectionFactory(); factory.HostName = "localhost";//RabbitMQ服务在本地运行 factory.UserName = "guest";//用户名 factory.Password = "guest";//密码 using (var connection = factory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { //声明两个队列 channel.QueueDeclare(queue: "DirectExchangeLogAllQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.QueueDeclare(queue: "DirectExchangeErrorQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); //1个路由 channel.ExchangeDeclare(exchange: "DirectExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); //4种路由key统一绑定DirectExchangeLogAllQueue队列, string[] logtypes = new string[] { "debug", "info", "warn", "error" }; foreach (string logtype in logtypes) { channel.QueueBind(queue: "DirectExchangeLogAllQueue", exchange: "DirectExChange", routingKey: logtype); } //路由key“error”再次绑定DirectExchangeErrorQueue队列 channel.QueueBind(queue: "DirectExchangeErrorQueue", exchange: "DirectExChange", routingKey: "error"); List<LogMsgModel> logList = new List<LogMsgModel>(); for (int i = 1; i <=20; i++) { if (i % 4 == 0) { logList.Add(new LogMsgModel() { LogType = "info", Msg = Encoding.UTF8.GetBytes($"info第{i}条信息") }); } if (i % 4 == 1) { logList.Add(new LogMsgModel() { LogType = "debug", Msg = Encoding.UTF8.GetBytes($"debug第{i}条信息") }); } if (i % 4 == 2) { logList.Add(new LogMsgModel() { LogType = "warn", Msg = Encoding.UTF8.GetBytes($"warn第{i}条信息") }); } if (i % 4 == 3) { logList.Add(new LogMsgModel() { LogType = "error", Msg = Encoding.UTF8.GetBytes($"error第{i}条信息") }); } } Console.WriteLine("生产者发送20条日志信息"); //发送日志信息 foreach (var log in logList) { channel.BasicPublish(exchange: "DirectExChange", routingKey: log.LogType, basicProperties: null, body: log.Msg); Console.WriteLine($"{Encoding.UTF8.GetString(log.Msg)} 已发送~~"); } } } } public class LogMsgModel { public string LogType { get; set; } public byte[] Msg { get; set; } } }