重点参考:NMS Documentation
一、ActiveMQ Queue
在ActiveMQ中Queue是一种点对点的消息分发方式,生产者在队列中添加一条消息,然后消费者消费一条消息,这条消息保证送达并且只会被一个消费者接收。
生产者
class ActiveMQSend { // Example connection strings: // activemq:tcp://activemqhost:61616 //localhost // stomp:tcp://activemqhost:61613 // ems:tcp://tibcohost:7222 // msmq://localhost string activemqIP = ConfigurationManager.AppSettings["activemqIP"]; string SendQueue = ConfigurationManager.AppSettings["SendQueue"]; HyExcel.ExcelHelper helper = new HyExcel.ExcelHelper(); /// <summary> /// 发送消息到队列 /// </summary> public void Send(string sendmessage) { try { Uri connecturi = new Uri(activemqIP); // NOTE: ensure the nmsprovider-activemq.config file exists in the executable folder. IConnectionFactory factory = new NMSConnectionFactory(connecturi); using (IConnection connection = factory.CreateConnection()) using (ISession session = connection.CreateSession()) { //Defaults to queue if type is not specified: IDestination sendDestination = SessionUtil.GetDestination(session, SendQueue); //发送目的地 // Create a consumer and producer using (IMessageProducer producer = session.CreateProducer(sendDestination)) { // Start the connection so that messages will be processed. connection.ExceptionListener += Connection_ExceptionListener; ; connection.Start(); producer.DeliveryMode = MsgDeliveryMode.Persistent; //消息持久化(到本地文件),消费者可以随时取到数据,而未持久化的发送数据在activemq服务重启之后数据是会清掉的。 // Send a message ITextMessage request = session.CreateTextMessage(sendmessage); request.NMSCorrelationID = "abc"; request.Properties["NMSXGroupID"] = "cheese"; request.Properties["myHeader"] = "Cheddar"; producer.Send(request); string fullPath = Environment.CurrentDirectory + "\\" + DateTime.Now.ToString("yyyyMMddhhmmss") + ".txt"; helper.WriteFile(fullPath, "云端同步数据:" + sendmessage); } } } catch (Exception ex) { string fullPath = Environment.CurrentDirectory + "\\" + DateTime.Now.ToString("yyyyMMddhhmmss") + ".txt"; helper.WriteFile(fullPath, ex.Message); } } private void Connection_ExceptionListener(Exception exception) { Console.WriteLine("生产者发生异常:{0}", exception); } }