重点参考:NMS Documentation

一、ActiveMQ Queue

在ActiveMQ中Queue是一种点对点的消息分发方式,生产者在队列中添加一条消息,然后消费者消费一条消息,这条消息保证送达并且只会被一个消费者接收

生产者

  class ActiveMQSend
    {
        // Example connection strings:
        //    activemq:tcp://activemqhost:61616    //localhost
        //    stomp:tcp://activemqhost:61613
        //    ems:tcp://tibcohost:7222
        //    msmq://localhost

        string activemqIP = ConfigurationManager.AppSettings["activemqIP"];
        string SendQueue = ConfigurationManager.AppSettings["SendQueue"];
        HyExcel.ExcelHelper helper = new HyExcel.ExcelHelper();

        /// <summary>
        /// 发送消息到队列
        /// </summary>
        public void Send(string sendmessage)
        {
            try
            {
                Uri connecturi = new Uri(activemqIP);

                // NOTE: ensure the nmsprovider-activemq.config file exists in the executable folder.
                IConnectionFactory factory = new NMSConnectionFactory(connecturi);
                using (IConnection connection = factory.CreateConnection())
                using (ISession session = connection.CreateSession())
                {
                    //Defaults to queue if type is not specified:
                    IDestination sendDestination = SessionUtil.GetDestination(session, SendQueue);  //发送目的地

                    // Create a consumer and producer
                    using (IMessageProducer producer = session.CreateProducer(sendDestination))
                    {
                        // Start the connection so that messages will be processed.
                        connection.ExceptionListener += Connection_ExceptionListener; ;
                        connection.Start();

                        producer.DeliveryMode = MsgDeliveryMode.Persistent; //消息持久化(到本地文件),消费者可以随时取到数据,而未持久化的发送数据在activemq服务重启之后数据是会清掉的。

                        // Send a message
                        ITextMessage request = session.CreateTextMessage(sendmessage);
                        request.NMSCorrelationID = "abc";
                        request.Properties["NMSXGroupID"] = "cheese";
                        request.Properties["myHeader"] = "Cheddar";
                        producer.Send(request);


                        string fullPath = Environment.CurrentDirectory + "\\" + DateTime.Now.ToString("yyyyMMddhhmmss") + ".txt";
                        helper.WriteFile(fullPath, "云端同步数据:" + sendmessage);
                    }
                }
            }
            catch (Exception ex)
            {
                string fullPath = Environment.CurrentDirectory + "\\" + DateTime.Now.ToString("yyyyMMddhhmmss") + ".txt";
                helper.WriteFile(fullPath, ex.Message);
            }
        }

        private void Connection_ExceptionListener(Exception exception)
        {
            Console.WriteLine("生产者发生异常:{0}", exception);
        }
    }
View Code

相关文章: