注意连接队列服务器时,参数必须和服务器配置一致
private string queue;//队列名
private bool durable;//持久化
private bool exclusive;//独占
private bool autoDelete;//自动删除
默认帐号guest不能远程。
默认访问队列端口是5672,后台网站端口默认是15672。
1、实现发送和接收,类RabbitMQServerT
using System; using MQServer; using RabbitMQ.Client; using System.Text; using System.Configuration; using RabbitMQ.Client.Events; using Newtonsoft.Json; namespace MQServer { /// <summary> /// RabbitMQ消息队列类 /// </summary> public class RabbitMQServerT { protected readonly Action<string, object> receive;//接收回调 private object penetrate;//接收回调透传参数 private string queue;//队列名 private bool durable;//持久化 private bool exclusive;//独占 private bool autoDelete;//自动删除 private bool isBeginInvoke;//接收后业务是否异步,异步的话消息可能在确认前被其他线程读走,造成重复读。//不异步就阻塞。//异步请独占 //接收消息对象 private IConnection connection; private IModel channel; public bool IsReceive; private ConnectionFactory factory; private RabbitMQServerT() { } /// <summary> /// 使用默认配置参数 /// </summary> /// <param name="_receive">消费事件,空则不消费</param> /// <param name="_queue">消息路径最后一层名字,可用于区分业务</param> /// <param name="_penetrate">接收回调透传参数</param> public RabbitMQServerT(Action<string, object> _receive, string _queue = @"hello", object _penetrate = null) { queue = _queue; receive = _receive; penetrate = _penetrate; isBeginInvoke = false; durable = bool.Parse(ConfigurationManager.AppSettings["RabbitMQ_durable"].ToString());// exclusive = bool.Parse(ConfigurationManager.AppSettings["RabbitMQ_exclusive"].ToString());// autoDelete = bool.Parse(ConfigurationManager.AppSettings["RabbitMQ_autoDelete"].ToString());// factory = new ConnectionFactory(); factory.HostName = ConfigurationManager.AppSettings["RabbitMQHostName"];//RabbitMQ服务器 factory.UserName = ConfigurationManager.AppSettings["RabbitMQUserName"];//用户名 factory.Password = ConfigurationManager.AppSettings["RabbitMQPassword"];//密码 factory.Port = int.Parse(ConfigurationManager.AppSettings["RabbitMQPort"].ToString());// if (!string.IsNullOrWhiteSpace(ConfigurationManager.AppSettings["RabbitMQVirtualHost"])) { factory.VirtualHost = ConfigurationManager.AppSettings["RabbitMQVirtualHost"];// } } /// <summary> /// 使用手动参数 /// </summary> /// <param name="_receive">消费事件,空则不消费</param> /// <param name="_queue">消息路径最后一层名字,可用于区分业务</param> /// <param name="_penetrate">接收回调透传参数</param> /// <param name="factory">连接队列服务器</param> /// <param name="durable">持久化</param> /// <param name="exclusive">独占</param> /// <param name="autoDelete">自动删除</param> /// <param name="isBeginInvoke">接收是否异步//异步请独占,否则异常</param> public RabbitMQServerT(Action<string, object> _receive, string _queue, object _penetrate, ConnectionFactory factory ,bool durable,bool exclusive, bool autoDelete,bool isBeginInvoke) { queue = _queue; receive = _receive; penetrate = _penetrate; this.factory = factory; this.durable = durable; this.exclusive = exclusive; this.autoDelete = autoDelete; this.isBeginInvoke = isBeginInvoke; //异步请独占,不然会重复读 if (isBeginInvoke == true && exclusive == false) { throw new Exception("接收消息队列对象RabbitMQServerT参数isBeginInvoke=true异步执行接收业务,如果要异步执行业务,请独占该消息exclusive=true,否则会被其他线程重复读取。"); } } /// <summary> /// 发送消息 /// </summary> /// <param name="message"></param> public void Send(string message) { //发送消息队列 try { using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue, durable, exclusive, autoDelete, null);//创建一个消息队列 var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish("", queue, null, body); //开始传递 //TLogHelper.Info(message, "RabbitMQServerTSend");//发送的内容写进TXT } } } catch (Exception ex) { TLogHelper.Error(ex.Message, "发送消息队列异常RabbitMQServerTSend:\n" + message); } } /// <summary> /// 发送消息 /// </summary> /// <param name="message"></param> public void Send(RabbitMQMsgModel model) { //发送消息队列 string message = JsonConvert.SerializeObject(model); Send(message); } /// <summary> /// 进行接收消息队列 /// </summary> public void Receive() { if (receive == null) { return; } IsReceive = true; try { connection = factory.CreateConnection(); channel = connection.CreateModel(); channel.QueueDeclare(queue, durable, exclusive, autoDelete, null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { try { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); //接收后业务 if (isBeginInvoke) { receive?.BeginInvoke(message, penetrate,(e)=>{ //确认消息 channel.BasicAck(ea.DeliveryTag, false); },null); } else { receive?.Invoke(message, penetrate); //确认消息 channel.BasicAck(ea.DeliveryTag, false); } } catch (Exception ex) { TLogHelper.Error(ex.Message, "接收消息队列业务异常Received:"+queue); } finally { //再次生成接收 Receive(); } }; channel.BasicConsume(queue, true, consumer); } catch (Exception ex) { TLogHelper.Error(ex.Message, "接收消息队列异常Receive"); } } /// <summary> /// 取消接收 /// </summary> public void EndReceive() { IsReceive=false; channel.Dispose(); connection.Dispose(); } } }