官网:https://www.rabbitmq.com/dotnet-api-guide.html

C# rabbitMQ示例

生产者:推送消息

 

    public class RabbitMqPulish
    {
        /// <summary>
        /// 推送
        /// </summary>
        /// <param name="value"></param>
        public void Producer(int value)
        {
            try
            {
                var queueName = "test01";
                var exchangeName = "changeName01";
                var exchangeType = ExchangeType.Fanout; //fanout、topic、fanout direct
                var routingKey = "*";
                var uri = new Uri("amqp://127.0.0.1:5672/");
                var factory = new ConnectionFactory
                {
                    UserName = "admin",
                    Password = "admin",
                    RequestedHeartbeat = TimeSpan.FromMilliseconds(0),
                    Endpoint = new AmqpTcpEndpoint(uri)
                };

                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        //设置交换器的类型
                        channel.ExchangeDeclare(exchangeName, exchangeType);
                        //声明一个队列,设置队列是否持久化,排他性,与自动删除
                        channel.QueueDeclare(queueName, true, false, false, null);
                        //绑定消息队列,交换器,routingkey
                        channel.QueueBind(queueName, exchangeName, routingKey);
                        //channel.QueueBind("test02", exchangeName, routingKey); //绑定多个队列,多端接收
                        channel.ConfirmSelect();
                        channel.BasicAcks += (sender ,e)=>{
                            Console.WriteLine($"BasicAcks..Pulish..{e.DeliveryTag}");
                        };
                        var properties = channel.CreateBasicProperties();
                        //队列持久化
                        properties.Persistent = true;
                        properties.DeliveryMode = 2; //发送具有传递模式 2(持久性)
                        //properties.Expiration = "36000000";
                        for (int i = 0; i < value; i++)
                        {
                            var pubMessage = new MqMessage()
                            {
                                MsgId = Guid.NewGuid().ToString(),
                                PushTime = DateTime.Now,
                                SendSortNo = i,
                                Message = "测试发送消息 " + i + ""
                            };
                            //JsonConvert.DeserializeObject
                            var body = Encoding.UTF8.GetBytes(System.Text.Json.JsonSerializer.Serialize(pubMessage));
                            //发送信息
                            channel.BasicPublish(exchangeName, routingKey, properties, body);

                            Console.WriteLine($"sending...{i}..." + System.Text.Json.JsonSerializer.Serialize(pubMessage));
                            System.Threading.Thread.Sleep(1000);
                        }

                    }
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }

        /// <summary>
        /// 推送Demo
        /// </summary>
        /// <param name="value"></param>
        public void PublishDemo(int value = 100)
        {
            #region 使用端点列表

            //var endpoints = new System.Collections.Generic.List<AmqpTcpEndpoint> {
            //  new AmqpTcpEndpoint("hostname"),
            //  new AmqpTcpEndpoint("localhost")
            //};
            //IConnection conn = factory.CreateConnection(endpoints);

            #endregion

            var queueName = "test02";
            var exchangeName = "changeName02";
            //var exchangeType = ExchangeType.Fanout;// fanout、topic、fanout direct
            var routingKey = "*";

            ConnectionFactory factory = new ConnectionFactory();
            factory.VirtualHost = "/";
            factory.UserName = "admin";
            factory.Password = "admin";
            factory.Uri = new Uri("amqp://127.0.0.1:5672/");
            //心跳超时值:RabbitMQ 建议的默认值为60。 零值表示对等​​体建议完全禁用心跳
            //将超时设置为 60 秒
            factory.RequestedHeartbeat = TimeSpan.FromSeconds(60);
            //连接工厂属性ConnectionFactory.ClientProvidedName,如果设置了该属性, 则控制该工厂打开的所有新连接的客户端提供的连接名称。
            factory.ClientProvidedName = "app:audit component:event-consumer";
            IConnection conn = factory.CreateConnection();
            //像连接一样,通道应该是长期存在的。为每个操作打开一个新通道会非常低效,并且非常不鼓励。通道的寿命可能比连接短.
            //每次操作关闭和打开新通道通常是不必要的,但可能是适当的
            IModel channel = conn.CreateModel();

            channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
            ////x-max-priority属性必须设置,否则消息优先级不生效
            //IDictionary<string, object> dicQueue = new Dictionary<string, object>();
            //dicQueue.Add("x-max-priority", 50);
            //dicQueue.Add("x-message-ttl", 60000); //队列超时
            ///// new Dictionary<string, object> { { "x-max-priority", 50 }
            //channel.QueueDeclare(queueName, true, false, false, dicQueue);

            channel.QueueDeclare(queueName, true, false, false, null);
            ////声明一个队列并指示服务器不发送任何响应
            //channel.QueueDeclareNoWait(queueName, true, false, false, null);
            channel.QueueBind(queueName, exchangeName, routingKey, null);
            ////推送的确认
            //channel.ConfirmSelect();
            //channel.BasicAcks += (sender, e) => {
            //    Console.WriteLine($"BasicAcks..Pulish..{e.DeliveryTag}");
            //};
            ////客户端可以订阅 IModel.BasicReturn事件。如果没有附加到事件的侦听器,则返回的消息将被静默丢弃。
            //channel.BasicReturn += (sender, e) => {
            //    Console.WriteLine($"Pulish..处理不可路由的消息..{e.Exchange}");
            //}; 

            //IModel#QueueDeclarePassive和IModel#ExchangeDeclarePassive是用于被动声明的方法
            var response = channel.QueueDeclarePassive(queueName);
            // 返回队列中处于就绪状态的消息数
            var messageCount = response.MessageCount;
            // 返回队列拥有的消费者数量
            var consumerCount = response.ConsumerCount;
            Console.WriteLine($"队列:{queueName},就绪状态的消息数:{messageCount},消费者数量:{consumerCount}");

            var props = channel.CreateBasicProperties();
            //队列持久化
            props.Persistent = true;
            //props.ContentType = "text/plain";
            props.DeliveryMode = 2;   //发送具有传递模式 2(持久性)
            //props.Expiration = "36000000";   //每条消息超时
            props.Priority = (byte)10;//设置消息优先级

            for (int i = 0; i < value; i++)
            {
                //Publishing Messages 方式1:
                //byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
                //channel.BasicPublish(exchangeName, routingKey, null, messageBodyBytes);
                //精细控制,您可以使用重载变体来指定强制标志,或指定消息属性
                var strMsg = $"Hello, world! ....{i}....{DateTime.Now}";
                byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes(strMsg);
                channel.BasicPublish(exchangeName, routingKey, props, messageBodyBytes);
                Console.WriteLine($"sending...{i}...{strMsg}" );
                System.Threading.Thread.Sleep(1000);
            }

            //要断开连接,只需关闭通道和连接:
            channel.Close();
            conn.Close();

            #region 一些设置

            //type:可选项为,fanout,direct,topic,headers。区别如下:
            //fanout:发送到所有与当前Exchange绑定的Queue中
            //direct:发送到与消息的routeKey相同的Rueue中
            //topic:fanout的模糊版本
            //headers:发送到与消息的header属性相同的Queue中

            //durable:持久化
            //autoDelete:当最后一个绑定(队列或者exchange)被unbind之后,该exchange自动被删除。
            //exclusive:如果为true,则queue只在channel存在时存在,channel关闭则queue消失

            ////显式删除队列或交换:
            //channel.QueueDelete("queue-name", false, false);
            ////只有当队列为空时才可以删除队列
            //channel.QueueDelete("queue-name", false, true);
            ////或者如果它没有被使用(没有任何消费者)
            //channel.QueueDelete("queue-name", true, false);
            ////可以清除队列(删除其所有消息):
            //channel.QueuePurge("队列名称");

            ////带有自定义标头的消息
            //byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");

            //IBasicProperties props = channel.CreateBasicProperties();
            //props.ContentType = "text/plain";
            //props.DeliveryMode = 2;
            //发布带有自定义标头的消息:Header模式
            //props.Headers = new Dictionary<string, object>();
            //props.Headers.Add("纬度", 51.5252949);
            //props.Headers.Add("longitude", -0.0905493);

            //channel.BasicPublish(exchangeName, routingKey, props, messageBodyBytes);
            //设置过期时间,两种方式来设置,1.队列 2.消息
            ////设置消息过期:
            //byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");

            //IBasicProperties props = channel.CreateBasicProperties();
            //props.ContentType = "text/plain";
            //props.DeliveryMode = 2;
            //props.Expiration = " 36000000 "; //每条消息超时

            //channel.BasicPublish(exchangeName, routingKey, props, messageBodyBytes);

            ////启用自动连接恢复
            //factory.AutomaticRecoveryEnabled = true;
            //// 每 10 秒尝试恢复一次,默认为5秒
            //factory.NetworkRecoveryInterval = TimeSpan.FromSeconds(10);
            //https://www.cnblogs.com/chenyishi/p/10242162.html
            ////设置优先级:x-max-priority属性必须设置,否则消息优先级不生效
            //IDictionary<string, object> dicQueue = new Dictionary<string, object>();
            //dicQueue.Add("x-max-priority", 50);
            //dicQueue.Add("x-expires",10000); //设置当前队列的过期时间为10000毫秒
            //channel.QueueDeclare(queueName, true, false, false, dicQueue);
            //properties.Priority = (byte)10;//设置消息优先级

            //死信的产生有三种
            //1.消息被拒(basic.reject or basic.nack)并且没有重新入队(requeue = false);
            //2.当前队列中的消息数量已经超过最大长度。
            //3.消息在队列中过期,即当前消息在队列中的存活时间已经超过了预先设置的TTL(Time To Live)时间;
            //arguments = new Dictionary<string, object> {
            //{ "x-dead-letter-exchange",exchangeD}, //设置当前队列的DLX
            //{ "x-dead-letter-routing-key",routeD}, //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列
            //{ "x-message-ttl",10000} //设置消息的存活时间,即过期时间
            //};


            #endregion
        }

        /// <summary>
        /// 设置优先级
        /// </summary>
        /// <param name="value"></param>
        public void ProducerPriority(int value = 100)
        {
            try
            {
                var queueName = "test03";
                var exchangeName = "changeName03";
                //var exchangeType = ExchangeType.Fanout;// fanout、topic、fanout direct
                var routingKey = "*";

                ConnectionFactory factory = new ConnectionFactory();
                factory.VirtualHost = "/";
                factory.UserName = "admin";
                factory.Password = "admin";
                factory.Uri = new Uri("amqp://127.0.0.1:5672/");
                //心跳超时值:RabbitMQ 建议的默认值为60。 零值表示对等​​体建议完全禁用心跳
                //将超时设置为 60 秒
                factory.RequestedHeartbeat = TimeSpan.FromSeconds(0);
                //连接工厂属性ConnectionFactory.ClientProvidedName,如果设置了该属性, 则控制该工厂打开的所有新连接的客户端提供的连接名称。
                factory.ClientProvidedName = "app:audit component:event-consumer";
                IConnection conn = factory.CreateConnection();
                //像连接一样,通道应该是长期存在的。为每个操作打开一个新通道会非常低效,并且非常不鼓励。通道的寿命可能比连接短.
                //每次操作关闭和打开新通道通常是不必要的,但可能是适当的
                IModel channel = conn.CreateModel();

                channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
                //x-max-priority属性必须设置,否则消息优先级不生效
                IDictionary<string, object> dicQueue = new Dictionary<string, object>();
                dicQueue.Add("x-max-priority", 10); //推送和消费一致
                //dicQueue.Add("x-message-ttl", 60000); //队列超时
                ///new Dictionary<string, object> { { "x-max-priority", 50 }
                channel.QueueDeclare(queueName, true, false, false, dicQueue);
                ////声明一个队列并指示服务器不发送任何响应
                //channel.QueueDeclareNoWait(queueName, true, false, false, null);
                channel.QueueBind(queueName, exchangeName, routingKey, null);
                
                for (int i = 0; i < value; i++)
                {
                    var props = channel.CreateBasicProperties();
                    //队列持久化
                    props.Persistent = true;
                    //props.ContentType = "text/plain";
                    props.DeliveryMode = 2;
                    //props.Expiration = "36000000";   //每条消息超时
                    props.Priority = (byte)(i % 10);//设置消息优先级

                    //Publishing Messages 方式1:
                    //byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
                    //channel.BasicPublish(exchangeName, routingKey, null, messageBodyBytes);
                    //精细控制,您可以使用重载变体来指定强制标志,或指定消息属性
                    var strMsg = $"Hello, world! ....{i}....{DateTime.Now}";
                    byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes(strMsg);
                    
                    channel.BasicPublish(exchangeName, routingKey, props, messageBodyBytes);
                    Console.WriteLine($"sending...{i}...{strMsg}");
                    //System.Threading.Thread.Sleep(1000);
                }

                //要断开连接,只需关闭通道和连接:
                channel.Close();
                conn.Close();

            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }

        /// <summary>
        /// 死信队列
        /// </summary>
        /// <param name="value"></param>
        public void ProducerDeadLetter(int value = 100)
        {
            try
            {
                var queueNameDead = "queueDead";
                var exchangeNameDead = "changeNameDead";
                var routingKeyDead = "routingKeyDead";

                var queueName = "test04";
                var exchangeName = "changeName04";
                //var exchangeType = ExchangeType.Fanout;// fanout、topic、fanout direct
                var routingKey = "*";

                ConnectionFactory factory = new ConnectionFactory();
                factory.VirtualHost = "/";
                factory.UserName = "admin";
                factory.Password = "admin";
                factory.Uri = new Uri("amqp://127.0.0.1:5672/");
                //心跳超时值:RabbitMQ 建议的默认值为60。 零值表示对等​​体建议完全禁用心跳
                //将超时设置为 60 秒
                factory.RequestedHeartbeat = TimeSpan.FromSeconds(0);
                //连接工厂属性ConnectionFactory.ClientProvidedName,如果设置了该属性, 则控制该工厂打开的所有新连接的客户端提供的连接名称。
                factory.ClientProvidedName = "app:audit component:event-consumer";
                IConnection conn = factory.CreateConnection();
                //像连接一样,通道应该是长期存在的。为每个操作打开一个新通道会非常低效,并且非常不鼓励。通道的寿命可能比连接短.
                //每次操作关闭和打开新通道通常是不必要的,但可能是适当的
                IModel channel = conn.CreateModel();
                //绑定死信队列
                channel.ExchangeDeclare(exchangeNameDead, ExchangeType.Topic);
                channel.QueueDeclare(queueNameDead, true, false, false, null);
                channel.QueueBind(queueNameDead, exchangeNameDead, routingKeyDead, null);
                //死信的产生有三种
                //1.消息被拒(basic.reject or basic.nack)并且没有重新入队(requeue = false);
                //2.当前队列中的消息数量已经超过最大长度。
                //3.消息在队列中过期,即当前消息在队列中的存活时间已经超过了预先设置的TTL(Time To Live)时间;
                //正常队列
                channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
                //x-max-priority属性必须设置,否则消息优先级不生效
                IDictionary<string, object> dicQueue = new Dictionary<string, object>();
                dicQueue.Add("x-dead-letter-exchange", exchangeNameDead); //设置当前队列的DLX
                dicQueue.Add("x-dead-letter-routing-key", routingKeyDead); //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列
                dicQueue.Add("x-message-ttl", 10000); //队列超时
                ///new Dictionary<string, object> { { "x-max-priority", 50 }
                channel.QueueDeclare(queueName, true, false, false, dicQueue);
                ////声明一个队列并指示服务器不发送任何响应
                //channel.QueueDeclareNoWait(queueName, true, false, false, null);
                channel.QueueBind(queueName, exchangeName, routingKey, null);

                for (int i = 0; i < value; i++)
                {
                    var props = channel.CreateBasicProperties();
                    //队列持久化
                    props.Persistent = true;
                    //props.ContentType = "text/plain";
                    props.DeliveryMode = 2; //发送具有传递模式 2(持久性)
                    //props.Expiration = "36000000";   //每条消息超时
                    //props.Priority = (byte)(i % 10);//设置消息优先级

                    //Publishing Messages 方式1:
                    //byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
                    //channel.BasicPublish(exchangeName, routingKey, null, messageBodyBytes);
                    //精细控制,您可以使用重载变体来指定强制标志,或指定消息属性
                    var strMsg = $"Hello, world! ....{i}....{DateTime.Now}";
                    byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes(strMsg);

                    channel.BasicPublish(exchangeName, routingKey, props, messageBodyBytes);
                    Console.WriteLine($"sending...{i}...{strMsg}");
                    //System.Threading.Thread.Sleep(1000);
                }

                //要断开连接,只需关闭通道和连接:
                channel.Close();
                conn.Close();

            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }

        private void Channel_BasicReturn(object sender, RabbitMQ.Client.Events.BasicReturnEventArgs e)
        {
            throw new NotImplementedException();
        }

        private void Channel_BasicAcks(object sender, RabbitMQ.Client.Events.BasicAckEventArgs e)
        {
            throw new NotImplementedException();
        }
    }

    /// <summary>
    /// 消息对象
    /// </summary>
    public class MqMessage
    {
        public string MsgId { get; set; }

        public string Type { get; set; }
        public int SendSortNo { get; set; }

        public string Message { get; set; }

        public object Data { get; set; }

        public DateTime PushTime { get; set; }
    }
Pulish

相关文章:

  • 2022-12-23
  • 2021-12-01
  • 2022-12-27
  • 2021-12-29
  • 2021-04-01
  • 2021-12-06
  • 2021-11-07
猜你喜欢
  • 2021-11-25
  • 2021-04-11
  • 2021-08-24
  • 2021-09-14
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
相关资源
相似解决方案