因为对JMS的了解也只算入门级,有些概念也很模糊,不过,卤煮会尽可能去介绍的。另外,sample code都调试过可以跑。

1.神马是JMS?

jms即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。——摘自百度百科

2.JMS组成要素

JMS提供者,JMS客户,JMS生产者,JMS消费者,JMS队列和JMS主题。具体啥意思,还是参考百度百科吧,不过,概念讲起来也木有啥意思,还会把人绕晕。多跑几次sample,回来再看概念可能更清楚些哈。另,博文中的JMS提供者以IBM 的WebSphere MQ为例。

3.JMS模型

JMS模型定义一组可供 Java™ 应用程序用于执行消息传递操作的接口。

以下列表概括了主要的JMS接口:

Destination:Destination 对象是应用程序将消息发往的位置和/或应用程序从其接收消息的源。

ConnectionFactory:ConnectionFactory 对象包括连接的一组配置属性。应用程序使用连接工厂来创建连接。

Connection:Connection 对象包括应用程序与消息传递服务器的活动连接。应用程序使用连接来创建会话。

Session:Session 对象是用于发送和接收消息的单个线程上下文。应用程序使用会话来创建消息、消息生产者和消息使用者。会话是事务性或非事务性会话。

Message:Message 对象包括应用程序发送或接收的消息。

MessageProducer:应用程序使用消息生产者将消息发送到目标。

MessageConsumer:应用程序使用消息使用者来接收已发送到目标的消息。

下图是这些对象之间的关系(摘自IBM Info Center)其实,IBM info center中资料多多的呀。

JMS连接WMQ及收发消息

Destination、ConnectionFactory 或 Connection 对象可供多线程应用程序的不同线程并发使用,但是 Session、MessageProducer 或 MessageConsumer 对象不能供不同线程并发使用。确保不并发使用 Session、MessageProducer 或 MessageConsumer 对象的最简单方法是为每个线程创建单独的 Session 对象。

JMS支持两种消息传递样式:

  • 点到点消息传递
  • 发布/预订消息传递

这两类消息传递也被称为消息传递域,且您可以将两类消息传递都组合在一个应用程序中。在点到点域中,目标是队列,而在发布/预订域中,目标是主题。

 通过JMS1.1 以前的JMS版本,对点到点域的程序设计使用一组接口和方法,而对发布/预订域的程序设计使用另一组接口和方法。两组接口和方法是相似的,但却各自独立。通过JMS1.1,您可以使用一组公共的支持两类消息传递域的接口和方法。公共接口提供了独立于域的每个消息传递域的视图。下表列出了独立于JMS 域的接口及其相应的特定于域的接口。

JMS连接WMQ及收发消息

特别清楚有木有。

4.开发JMS客户端用于连接WMQ及收发消息

下面的code能跑起来的前提是,本地已安装MQ,且创建好队列管理器和相应的QUEUE,TOPIC。

连接方式一:使用的是IBM对于JMS的实现(要导入包com.ibm.mqjms.jar):

  1 package com.demo;
  2 
  3 import java.io.UnsupportedEncodingException;
  4 
  5 import javax.jms.BytesMessage;
  6 import javax.jms.Connection;
  7 import javax.jms.DeliveryMode;
  8 import javax.jms.ExceptionListener;
  9 import javax.jms.JMSException;
 10 import javax.jms.Message;
 11 import javax.jms.MessageConsumer;
 12 import javax.jms.MessageListener;
 13 import javax.jms.MessageProducer;
 14 import javax.jms.Queue;
 15 import javax.jms.QueueConnection;
 16 import javax.jms.QueueConnectionFactory;
 17 import javax.jms.Session;
 18 import javax.jms.TextMessage;
 19 import javax.naming.NamingException;
 20 
 21 import com.ibm.mq.jms.MQQueueConnectionFactory;
 22 
 23 public class JmsQueueDemo {
 24     private static Connection conn = null;
 25     private static Session session = null;
 26     private static MessageProducer producer = null;
 27     private static MessageConsumer consumer = null;
 28     private static QueueConnection qConn = null;
 29 
 30     public static void init() {
 31         // 连接工厂,用com.ibm.mq.jms中的类实现javax.jms中的接口
 32         QueueConnectionFactory qcf = new MQQueueConnectionFactory();
 33 
 34         // 设置连接工厂属性
 35         try {
 36             //设置WMQ所在机器的IP
 37             ((MQQueueConnectionFactory) qcf).setHostName("localhost");
 38             //设置WMQ上队列管理器名
 39             ((MQQueueConnectionFactory) qcf).setQueueManager("TestQM");
 40             //设置WMQ上的通道名
 41             ((MQQueueConnectionFactory) qcf).setChannel("SYSTEM.DEF.SVRCONN");
 42             //设置WMQ上的监听端口
 43             ((MQQueueConnectionFactory) qcf).setPort(1414);
 44             
 45             //由连接工厂创建连接
 46             qConn = qcf.createQueueConnection();
 47 
 48             //建立异常监听器用于监听连接过程中发生的异常
 49              ExceptionListener exceptionListener = new ExceptionListener(){
 50             
 51              //此处可放入更多逻辑,由自己定义
 52              public void onException(JMSException e) {
 53              System.out.println("mq exception");
 54              e.printStackTrace();
 55              System.exit(0);
 56              }
 57             
 58              };
 59              //在连接上面注册监听器
 60              qConn.setExceptionListener(exceptionListener);
 61         } catch (JMSException e) {
 62 
 63             e.printStackTrace();
 64             return;
 65         }
 66     }
 67 
 68     public static void main(String[] args) throws NamingException,
 69             JMSException, UnsupportedEncodingException {
 70 
 71         init();
 72          sendMessage();
 73         // receiveMessage();
 74         receiveWithListener();
 75         destroy();
 76 
 77     }
 78 
 79     public static void sendMessage() throws JMSException {
 80         boolean transacted = false;
 81 
 82         // 非事务处理(分别接收或发送消息)[事务处理(全部发送或者全部接收作为一个单元的一组消息)]
 83         session = qConn
 84                 .createQueueSession(transacted, Session.AUTO_ACKNOWLEDGE);
 85         // 由session创建要发送到的队列
 86         Queue inputQ = session.createQueue("TestQ");
 87         
 88         //由session创建消息发送者
 89         MessageProducer sender = session.createProducer(inputQ);
 90         
 91         //启动连接
 92         qConn.start();
 93         
 94         // 消息由会话创建
 95         TextMessage message = session.createTextMessage();
 96         //设置消息内容
 97         message.setText("this is input message from queue sender");
 98         //这句可有可无的哦,主要用于设置消息属性;方便后面取消息时,取特定类型的消息,如"company='systems'"
 99         message.setStringProperty("company", "systems");
100 
101         // 发送消息,后面的参数依次为消息的持久性设置,消息的优先级,消息在队列的存活时间,设置为0,表示永不失效
102         //DeliveryMode为PERSISTENT表示,队列管理器或者WMQ重启后,消息仍在queue中;NON_PERSISTENT意思相反
103         sender.send(message, DeliveryMode.NON_PERSISTENT, 7, 0);
104     }
105 
106     //使用listener的方式从queue中取消息,可一次取多条消息出来
107     public static void receiveWithListener() throws JMSException {
108 
109         boolean transacted = false;
110 
111         // 非事务处理(分别接收或发送消息)[事务处理(全部发送或者全部接收作为一个单元的一组消息)]
112         session = qConn
113                 .createQueueSession(transacted, Session.AUTO_ACKNOWLEDGE);
114         //从同一个queue中取消息,此处为使用session创建queue
115         Queue outputQ = session.createQueue("TestQ");
116         //使用session创建消息消费者,注意了,后面的那个参数就是消息选择器,用于接收特定类型的消息
117         consumer = session.createConsumer(outputQ, "company='t-systems'");
118         //创建消息监听器
119         MessageListener listener = new MessageListener() {
120             public void onMessage(Message message) {
121                 try {
122                     if (message instanceof TextMessage) {
123                         System.out.println("Listener 接收消息:"
124                                 + ((TextMessage) message).getText());
125                     }
126                 } catch (JMSException e) {
127                     e.printStackTrace();
128                 }
129             }
130         };
131         //注册消息监听器
132         consumer.setMessageListener(listener);
133         //启动连接
134         qConn.start();
135         try {
136             Thread.sleep(10 * 1000);
137         } catch (InterruptedException e) {
138             e.printStackTrace();
139         }
140     }
141     //取消息的另一种方式,手动从queue中取消息,一次只能接收一条消息
142     public static void receiveMessage() throws JMSException,
143             UnsupportedEncodingException {
144 
145         boolean transacted = false;
146 
147         // 非事务处理(分别接收或发送消息)[事务处理(全部发送或者全部接收作为一个单元的一组消息)]
148         session = qConn
149                 .createQueueSession(transacted, Session.AUTO_ACKNOWLEDGE);
150         // 对队列管理器上队列的映射
151         Queue outputQ = session.createQueue("TestQ");
152         consumer = session.createConsumer(outputQ);
153         qConn.start();
154         //此时,若send的message中,设置了message的属性如,"company='systems'",下面的方法是取不到消息的哈
155         //要取到消息可屏蔽到send中设置message属性的语句,或使用consumer.receive("company='systems'")
156         Message msg = consumer.receiveNoWait();
157         //转换消息格式
158         if (msg instanceof TextMessage) {
159             TextMessage message = (TextMessage) msg;
160             System.out.println("received message from queue is:"
161                     + message.getText());
162         } else if (msg instanceof BytesMessage) {
163             BytesMessage message = (BytesMessage) msg;
164             byte buff[] = null;
165             long length = message.getBodyLength();
166             buff = new byte[(int) length];
167             message.readBytes(buff);
168             String textmessage = new String(buff, "UTF-8");
169             System.out.println("received message from queue is:" + textmessage);
170         }
171     }
172     //销毁资源
173     public static void destroy() throws JMSException {
174         if (consumer != null) {
175             consumer.close();
176         }
177         if (producer != null) {
178             producer.close();
179         }
180         if (session != null) {
181             session.close();
182         }
183         if (conn != null) {
184             conn.close();
185         }
186 
187     }
188 
189 }
View Code

相关文章: