官网:https://www.rabbitmq.com/dotnet-api-guide.html
C# rabbitMQ示例
生产者:推送消息
public class RabbitMqPulish { /// <summary> /// 推送 /// </summary> /// <param name="value"></param> public void Producer(int value) { try { var queueName = "test01"; var exchangeName = "changeName01"; var exchangeType = ExchangeType.Fanout; //fanout、topic、fanout direct var routingKey = "*"; var uri = new Uri("amqp://127.0.0.1:5672/"); var factory = new ConnectionFactory { UserName = "admin", Password = "admin", RequestedHeartbeat = TimeSpan.FromMilliseconds(0), Endpoint = new AmqpTcpEndpoint(uri) }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { //设置交换器的类型 channel.ExchangeDeclare(exchangeName, exchangeType); //声明一个队列,设置队列是否持久化,排他性,与自动删除 channel.QueueDeclare(queueName, true, false, false, null); //绑定消息队列,交换器,routingkey channel.QueueBind(queueName, exchangeName, routingKey); //channel.QueueBind("test02", exchangeName, routingKey); //绑定多个队列,多端接收 channel.ConfirmSelect(); channel.BasicAcks += (sender ,e)=>{ Console.WriteLine($"BasicAcks..Pulish..{e.DeliveryTag}"); }; var properties = channel.CreateBasicProperties(); //队列持久化 properties.Persistent = true; properties.DeliveryMode = 2; //发送具有传递模式 2(持久性) //properties.Expiration = "36000000"; for (int i = 0; i < value; i++) { var pubMessage = new MqMessage() { MsgId = Guid.NewGuid().ToString(), PushTime = DateTime.Now, SendSortNo = i, Message = "测试发送消息 " + i + "" }; //JsonConvert.DeserializeObject var body = Encoding.UTF8.GetBytes(System.Text.Json.JsonSerializer.Serialize(pubMessage)); //发送信息 channel.BasicPublish(exchangeName, routingKey, properties, body); Console.WriteLine($"sending...{i}..." + System.Text.Json.JsonSerializer.Serialize(pubMessage)); System.Threading.Thread.Sleep(1000); } } } } catch (Exception ex) { Console.WriteLine(ex.Message); } } /// <summary> /// 推送Demo /// </summary> /// <param name="value"></param> public void PublishDemo(int value = 100) { #region 使用端点列表 //var endpoints = new System.Collections.Generic.List<AmqpTcpEndpoint> { // new AmqpTcpEndpoint("hostname"), // new AmqpTcpEndpoint("localhost") //}; //IConnection conn = factory.CreateConnection(endpoints); #endregion var queueName = "test02"; var exchangeName = "changeName02"; //var exchangeType = ExchangeType.Fanout;// fanout、topic、fanout direct var routingKey = "*"; ConnectionFactory factory = new ConnectionFactory(); factory.VirtualHost = "/"; factory.UserName = "admin"; factory.Password = "admin"; factory.Uri = new Uri("amqp://127.0.0.1:5672/"); //心跳超时值:RabbitMQ 建议的默认值为60。 零值表示对等体建议完全禁用心跳 //将超时设置为 60 秒 factory.RequestedHeartbeat = TimeSpan.FromSeconds(60); //连接工厂属性ConnectionFactory.ClientProvidedName,如果设置了该属性, 则控制该工厂打开的所有新连接的客户端提供的连接名称。 factory.ClientProvidedName = "app:audit component:event-consumer"; IConnection conn = factory.CreateConnection(); //像连接一样,通道应该是长期存在的。为每个操作打开一个新通道会非常低效,并且非常不鼓励。通道的寿命可能比连接短. //每次操作关闭和打开新通道通常是不必要的,但可能是适当的 IModel channel = conn.CreateModel(); channel.ExchangeDeclare(exchangeName, ExchangeType.Topic); ////x-max-priority属性必须设置,否则消息优先级不生效 //IDictionary<string, object> dicQueue = new Dictionary<string, object>(); //dicQueue.Add("x-max-priority", 50); //dicQueue.Add("x-message-ttl", 60000); //队列超时 ///// new Dictionary<string, object> { { "x-max-priority", 50 } //channel.QueueDeclare(queueName, true, false, false, dicQueue); channel.QueueDeclare(queueName, true, false, false, null); ////声明一个队列并指示服务器不发送任何响应 //channel.QueueDeclareNoWait(queueName, true, false, false, null); channel.QueueBind(queueName, exchangeName, routingKey, null); ////推送的确认 //channel.ConfirmSelect(); //channel.BasicAcks += (sender, e) => { // Console.WriteLine($"BasicAcks..Pulish..{e.DeliveryTag}"); //}; ////客户端可以订阅 IModel.BasicReturn事件。如果没有附加到事件的侦听器,则返回的消息将被静默丢弃。 //channel.BasicReturn += (sender, e) => { // Console.WriteLine($"Pulish..处理不可路由的消息..{e.Exchange}"); //}; //IModel#QueueDeclarePassive和IModel#ExchangeDeclarePassive是用于被动声明的方法 var response = channel.QueueDeclarePassive(queueName); // 返回队列中处于就绪状态的消息数 var messageCount = response.MessageCount; // 返回队列拥有的消费者数量 var consumerCount = response.ConsumerCount; Console.WriteLine($"队列:{queueName},就绪状态的消息数:{messageCount},消费者数量:{consumerCount}"); var props = channel.CreateBasicProperties(); //队列持久化 props.Persistent = true; //props.ContentType = "text/plain"; props.DeliveryMode = 2; //发送具有传递模式 2(持久性) //props.Expiration = "36000000"; //每条消息超时 props.Priority = (byte)10;//设置消息优先级 for (int i = 0; i < value; i++) { //Publishing Messages 方式1: //byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!"); //channel.BasicPublish(exchangeName, routingKey, null, messageBodyBytes); //精细控制,您可以使用重载变体来指定强制标志,或指定消息属性 var strMsg = $"Hello, world! ....{i}....{DateTime.Now}"; byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes(strMsg); channel.BasicPublish(exchangeName, routingKey, props, messageBodyBytes); Console.WriteLine($"sending...{i}...{strMsg}" ); System.Threading.Thread.Sleep(1000); } //要断开连接,只需关闭通道和连接: channel.Close(); conn.Close(); #region 一些设置 //type:可选项为,fanout,direct,topic,headers。区别如下: //fanout:发送到所有与当前Exchange绑定的Queue中 //direct:发送到与消息的routeKey相同的Rueue中 //topic:fanout的模糊版本 //headers:发送到与消息的header属性相同的Queue中 //durable:持久化 //autoDelete:当最后一个绑定(队列或者exchange)被unbind之后,该exchange自动被删除。 //exclusive:如果为true,则queue只在channel存在时存在,channel关闭则queue消失 ////显式删除队列或交换: //channel.QueueDelete("queue-name", false, false); ////只有当队列为空时才可以删除队列 //channel.QueueDelete("queue-name", false, true); ////或者如果它没有被使用(没有任何消费者) //channel.QueueDelete("queue-name", true, false); ////可以清除队列(删除其所有消息): //channel.QueuePurge("队列名称"); ////带有自定义标头的消息 //byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!"); //IBasicProperties props = channel.CreateBasicProperties(); //props.ContentType = "text/plain"; //props.DeliveryMode = 2; //发布带有自定义标头的消息:Header模式 //props.Headers = new Dictionary<string, object>(); //props.Headers.Add("纬度", 51.5252949); //props.Headers.Add("longitude", -0.0905493); //channel.BasicPublish(exchangeName, routingKey, props, messageBodyBytes); //设置过期时间,两种方式来设置,1.队列 2.消息 ////设置消息过期: //byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!"); //IBasicProperties props = channel.CreateBasicProperties(); //props.ContentType = "text/plain"; //props.DeliveryMode = 2; //props.Expiration = " 36000000 "; //每条消息超时 //channel.BasicPublish(exchangeName, routingKey, props, messageBodyBytes); ////启用自动连接恢复 //factory.AutomaticRecoveryEnabled = true; //// 每 10 秒尝试恢复一次,默认为5秒 //factory.NetworkRecoveryInterval = TimeSpan.FromSeconds(10); //https://www.cnblogs.com/chenyishi/p/10242162.html ////设置优先级:x-max-priority属性必须设置,否则消息优先级不生效 //IDictionary<string, object> dicQueue = new Dictionary<string, object>(); //dicQueue.Add("x-max-priority", 50); //dicQueue.Add("x-expires",10000); //设置当前队列的过期时间为10000毫秒 //channel.QueueDeclare(queueName, true, false, false, dicQueue); //properties.Priority = (byte)10;//设置消息优先级 //死信的产生有三种 //1.消息被拒(basic.reject or basic.nack)并且没有重新入队(requeue = false); //2.当前队列中的消息数量已经超过最大长度。 //3.消息在队列中过期,即当前消息在队列中的存活时间已经超过了预先设置的TTL(Time To Live)时间; //arguments = new Dictionary<string, object> { //{ "x-dead-letter-exchange",exchangeD}, //设置当前队列的DLX //{ "x-dead-letter-routing-key",routeD}, //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列 //{ "x-message-ttl",10000} //设置消息的存活时间,即过期时间 //}; #endregion } /// <summary> /// 设置优先级 /// </summary> /// <param name="value"></param> public void ProducerPriority(int value = 100) { try { var queueName = "test03"; var exchangeName = "changeName03"; //var exchangeType = ExchangeType.Fanout;// fanout、topic、fanout direct var routingKey = "*"; ConnectionFactory factory = new ConnectionFactory(); factory.VirtualHost = "/"; factory.UserName = "admin"; factory.Password = "admin"; factory.Uri = new Uri("amqp://127.0.0.1:5672/"); //心跳超时值:RabbitMQ 建议的默认值为60。 零值表示对等体建议完全禁用心跳 //将超时设置为 60 秒 factory.RequestedHeartbeat = TimeSpan.FromSeconds(0); //连接工厂属性ConnectionFactory.ClientProvidedName,如果设置了该属性, 则控制该工厂打开的所有新连接的客户端提供的连接名称。 factory.ClientProvidedName = "app:audit component:event-consumer"; IConnection conn = factory.CreateConnection(); //像连接一样,通道应该是长期存在的。为每个操作打开一个新通道会非常低效,并且非常不鼓励。通道的寿命可能比连接短. //每次操作关闭和打开新通道通常是不必要的,但可能是适当的 IModel channel = conn.CreateModel(); channel.ExchangeDeclare(exchangeName, ExchangeType.Topic); //x-max-priority属性必须设置,否则消息优先级不生效 IDictionary<string, object> dicQueue = new Dictionary<string, object>(); dicQueue.Add("x-max-priority", 10); //推送和消费一致 //dicQueue.Add("x-message-ttl", 60000); //队列超时 ///new Dictionary<string, object> { { "x-max-priority", 50 } channel.QueueDeclare(queueName, true, false, false, dicQueue); ////声明一个队列并指示服务器不发送任何响应 //channel.QueueDeclareNoWait(queueName, true, false, false, null); channel.QueueBind(queueName, exchangeName, routingKey, null); for (int i = 0; i < value; i++) { var props = channel.CreateBasicProperties(); //队列持久化 props.Persistent = true; //props.ContentType = "text/plain"; props.DeliveryMode = 2; //props.Expiration = "36000000"; //每条消息超时 props.Priority = (byte)(i % 10);//设置消息优先级 //Publishing Messages 方式1: //byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!"); //channel.BasicPublish(exchangeName, routingKey, null, messageBodyBytes); //精细控制,您可以使用重载变体来指定强制标志,或指定消息属性 var strMsg = $"Hello, world! ....{i}....{DateTime.Now}"; byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes(strMsg); channel.BasicPublish(exchangeName, routingKey, props, messageBodyBytes); Console.WriteLine($"sending...{i}...{strMsg}"); //System.Threading.Thread.Sleep(1000); } //要断开连接,只需关闭通道和连接: channel.Close(); conn.Close(); } catch (Exception ex) { Console.WriteLine(ex.Message); } } /// <summary> /// 死信队列 /// </summary> /// <param name="value"></param> public void ProducerDeadLetter(int value = 100) { try { var queueNameDead = "queueDead"; var exchangeNameDead = "changeNameDead"; var routingKeyDead = "routingKeyDead"; var queueName = "test04"; var exchangeName = "changeName04"; //var exchangeType = ExchangeType.Fanout;// fanout、topic、fanout direct var routingKey = "*"; ConnectionFactory factory = new ConnectionFactory(); factory.VirtualHost = "/"; factory.UserName = "admin"; factory.Password = "admin"; factory.Uri = new Uri("amqp://127.0.0.1:5672/"); //心跳超时值:RabbitMQ 建议的默认值为60。 零值表示对等体建议完全禁用心跳 //将超时设置为 60 秒 factory.RequestedHeartbeat = TimeSpan.FromSeconds(0); //连接工厂属性ConnectionFactory.ClientProvidedName,如果设置了该属性, 则控制该工厂打开的所有新连接的客户端提供的连接名称。 factory.ClientProvidedName = "app:audit component:event-consumer"; IConnection conn = factory.CreateConnection(); //像连接一样,通道应该是长期存在的。为每个操作打开一个新通道会非常低效,并且非常不鼓励。通道的寿命可能比连接短. //每次操作关闭和打开新通道通常是不必要的,但可能是适当的 IModel channel = conn.CreateModel(); //绑定死信队列 channel.ExchangeDeclare(exchangeNameDead, ExchangeType.Topic); channel.QueueDeclare(queueNameDead, true, false, false, null); channel.QueueBind(queueNameDead, exchangeNameDead, routingKeyDead, null); //死信的产生有三种 //1.消息被拒(basic.reject or basic.nack)并且没有重新入队(requeue = false); //2.当前队列中的消息数量已经超过最大长度。 //3.消息在队列中过期,即当前消息在队列中的存活时间已经超过了预先设置的TTL(Time To Live)时间; //正常队列 channel.ExchangeDeclare(exchangeName, ExchangeType.Topic); //x-max-priority属性必须设置,否则消息优先级不生效 IDictionary<string, object> dicQueue = new Dictionary<string, object>(); dicQueue.Add("x-dead-letter-exchange", exchangeNameDead); //设置当前队列的DLX dicQueue.Add("x-dead-letter-routing-key", routingKeyDead); //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列 dicQueue.Add("x-message-ttl", 10000); //队列超时 ///new Dictionary<string, object> { { "x-max-priority", 50 } channel.QueueDeclare(queueName, true, false, false, dicQueue); ////声明一个队列并指示服务器不发送任何响应 //channel.QueueDeclareNoWait(queueName, true, false, false, null); channel.QueueBind(queueName, exchangeName, routingKey, null); for (int i = 0; i < value; i++) { var props = channel.CreateBasicProperties(); //队列持久化 props.Persistent = true; //props.ContentType = "text/plain"; props.DeliveryMode = 2; //发送具有传递模式 2(持久性) //props.Expiration = "36000000"; //每条消息超时 //props.Priority = (byte)(i % 10);//设置消息优先级 //Publishing Messages 方式1: //byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!"); //channel.BasicPublish(exchangeName, routingKey, null, messageBodyBytes); //精细控制,您可以使用重载变体来指定强制标志,或指定消息属性 var strMsg = $"Hello, world! ....{i}....{DateTime.Now}"; byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes(strMsg); channel.BasicPublish(exchangeName, routingKey, props, messageBodyBytes); Console.WriteLine($"sending...{i}...{strMsg}"); //System.Threading.Thread.Sleep(1000); } //要断开连接,只需关闭通道和连接: channel.Close(); conn.Close(); } catch (Exception ex) { Console.WriteLine(ex.Message); } } private void Channel_BasicReturn(object sender, RabbitMQ.Client.Events.BasicReturnEventArgs e) { throw new NotImplementedException(); } private void Channel_BasicAcks(object sender, RabbitMQ.Client.Events.BasicAckEventArgs e) { throw new NotImplementedException(); } } /// <summary> /// 消息对象 /// </summary> public class MqMessage { public string MsgId { get; set; } public string Type { get; set; } public int SendSortNo { get; set; } public string Message { get; set; } public object Data { get; set; } public DateTime PushTime { get; set; } }