注意连接队列服务器时,参数必须和服务器配置一致

  private string queue;//队列名
        private bool durable;//持久化
        private bool exclusive;//独占
        private bool autoDelete;//自动删除

默认帐号guest不能远程。

默认访问队列端口是5672,后台网站端口默认是15672。

 

1、实现发送和接收,类RabbitMQServerT

using System;
using MQServer;
using RabbitMQ.Client;
using System.Text;
using System.Configuration;
using RabbitMQ.Client.Events;
using Newtonsoft.Json;

namespace MQServer
{
    /// <summary>
    /// RabbitMQ消息队列类
    /// </summary>
    public class RabbitMQServerT 
    {
        protected readonly Action<string, object> receive;//接收回调
        private object penetrate;//接收回调透传参数
        private string queue;//队列名
        private bool durable;//持久化
        private bool exclusive;//独占
        private bool autoDelete;//自动删除
        private bool isBeginInvoke;//接收后业务是否异步,异步的话消息可能在确认前被其他线程读走,造成重复读。//不异步就阻塞。//异步请独占

        //接收消息对象
        private IConnection connection;
        private IModel channel;

        public bool IsReceive;

        private ConnectionFactory factory;
        private RabbitMQServerT()
        {
        }

        /// <summary>
        /// 使用默认配置参数
        /// </summary>
        /// <param name="_receive">消费事件,空则不消费</param>
        /// <param name="_queue">消息路径最后一层名字,可用于区分业务</param>
        /// <param name="_penetrate">接收回调透传参数</param>
        public RabbitMQServerT(Action<string, object> _receive, string _queue = @"hello", object _penetrate = null)
        {
            queue = _queue;
            receive = _receive;
            penetrate = _penetrate;
            isBeginInvoke = false;

            durable = bool.Parse(ConfigurationManager.AppSettings["RabbitMQ_durable"].ToString());//
            exclusive = bool.Parse(ConfigurationManager.AppSettings["RabbitMQ_exclusive"].ToString());//
            autoDelete = bool.Parse(ConfigurationManager.AppSettings["RabbitMQ_autoDelete"].ToString());//

            factory = new ConnectionFactory();
            factory.HostName = ConfigurationManager.AppSettings["RabbitMQHostName"];//RabbitMQ服务器
            factory.UserName = ConfigurationManager.AppSettings["RabbitMQUserName"];//用户名
            factory.Password = ConfigurationManager.AppSettings["RabbitMQPassword"];//密码
            factory.Port = int.Parse(ConfigurationManager.AppSettings["RabbitMQPort"].ToString());//
            if (!string.IsNullOrWhiteSpace(ConfigurationManager.AppSettings["RabbitMQVirtualHost"]))
            {
                factory.VirtualHost = ConfigurationManager.AppSettings["RabbitMQVirtualHost"];//
            }
        }

        /// <summary>
        /// 使用手动参数
        /// </summary>
        /// <param name="_receive">消费事件,空则不消费</param>
        /// <param name="_queue">消息路径最后一层名字,可用于区分业务</param>
        /// <param name="_penetrate">接收回调透传参数</param>
        /// <param name="factory">连接队列服务器</param>
        /// <param name="durable">持久化</param>
        /// <param name="exclusive">独占</param>
        /// <param name="autoDelete">自动删除</param>
        /// <param name="isBeginInvoke">接收是否异步//异步请独占,否则异常</param>
        public RabbitMQServerT(Action<string, object> _receive, string _queue, object _penetrate, ConnectionFactory factory
            ,bool durable,bool exclusive, bool autoDelete,bool isBeginInvoke)
        {
            queue = _queue;
            receive = _receive;
            penetrate = _penetrate;

            this.factory = factory;
            this.durable = durable;
            this.exclusive = exclusive;
            this.autoDelete = autoDelete;
            this.isBeginInvoke = isBeginInvoke;

            //异步请独占,不然会重复读
            if (isBeginInvoke == true && exclusive == false) 
            {
                throw new Exception("接收消息队列对象RabbitMQServerT参数isBeginInvoke=true异步执行接收业务,如果要异步执行业务,请独占该消息exclusive=true,否则会被其他线程重复读取。");
            }
        }

        /// <summary>
        /// 发送消息
        /// </summary>
        /// <param name="message"></param>
        public void Send(string message)
        {
            //发送消息队列
            try
            {
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        channel.QueueDeclare(queue, durable, exclusive, autoDelete, null);//创建一个消息队列
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish("", queue, null, body); //开始传递
                        //TLogHelper.Info(message, "RabbitMQServerTSend");//发送的内容写进TXT
                    }
                }
            }
            catch (Exception ex)
            {
                TLogHelper.Error(ex.Message, "发送消息队列异常RabbitMQServerTSend:\n" + message);
            }
        }
        /// <summary>
        /// 发送消息
        /// </summary>
        /// <param name="message"></param>
        public void Send(RabbitMQMsgModel model)
        {
            //发送消息队列
            string message = JsonConvert.SerializeObject(model);
            Send(message);
        }

        /// <summary>
        /// 进行接收消息队列
        /// </summary>
        public void Receive()
        {
            if (receive == null)
            {
                return;
            }
            IsReceive = true;
            try
            {
                connection = factory.CreateConnection();
                channel = connection.CreateModel();
              
                channel.QueueDeclare(queue, durable, exclusive, autoDelete, null);
                channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    try
                    {
                        var body = ea.Body.ToArray();
                        var message = Encoding.UTF8.GetString(body);

                        //接收后业务
                        if (isBeginInvoke)
                        {
                            receive?.BeginInvoke(message, penetrate,(e)=>{
                                //确认消息
                                channel.BasicAck(ea.DeliveryTag, false);
                            },null);
                        }
                        else 
                        {
                            receive?.Invoke(message, penetrate);
                            //确认消息
                            channel.BasicAck(ea.DeliveryTag, false);
                        }
                    }
                    catch (Exception ex)
                    {
                        TLogHelper.Error(ex.Message, "接收消息队列业务异常Received:"+queue);
                    }
                    finally
                    {
                        //再次生成接收
                        Receive();
                    }
                };
                channel.BasicConsume(queue, true, consumer);
            }
            catch (Exception ex)
            {
                TLogHelper.Error(ex.Message, "接收消息队列异常Receive");
            }
        }

        /// <summary>
        /// 取消接收
        /// </summary>
        public void EndReceive()
        {
            IsReceive=false;
            channel.Dispose();
            connection.Dispose();
        }
    }
}
View Code

相关文章: