下载
官方网站下载:http://activemq.apache.org/
安装
1:将apache-activemq-5.15.13-bin.tar.gz上传至服务器
2:解压此文件
tar zxvf apache-activemq-5.15.13-bin.tar.gz
3:解压完成后需要修改apache-activemq-5.15.13文件夹的权限
chmod 777 apache-activemq-5.15.13
4:进入apache-activemq-5.15.13\bin
cd apache-activemq-5.15.13\bin
5:赋予activemq权限
chmod 755 activemq
启动
./activemq start
查看是activemq是否运行
./activemq status
说明已经启动成功
如果要停止运行输入./activemq stop
如果启动失败。查看一下启动日志
./activemq console
可能原因(1:端口被占用。2:服务器含有非法字符)
如果是第一种,直接将占用端口停止即可,如果是第二种需要改主机名,修改方法如下:
{redhat/centos上永久修改
[[email protected] ~]# cat /etc/sysconfig/network
NETWORKING=yes
HOSTNAME=localhost.localdomain
GATEWAY=192.168.10.1
修改network的HOSTNAME项。点前面是主机名,点后面是域名。没有点就是主机名。
[[email protected] ~]# vi /etc/sysconfig/network
NETWORKING=yes
NETWORKING_IPV6=no
HOSTNAME=master
这个是永久修改,重启后生效。目前不知道怎么立即生效。}
打开浏览器输入地址: 列如
http://192.168.1.105:8161/ 即可进入ActiveMQ管理页面
进入主界面
点击进入管理页面
输入用户名和密码 均为 admin
进入主界面
点对点消息列表:
列表各列信息含义如下:
Number Of Pending Messages :等待消费的消息 这个是当前未出队列的数量。
Number Of Consumers :消费者 这个是消费者端的消费者数量
Messages Enqueued :进入队列的消息 进入队列的总数量,包括出队列的。
Messages Dequeued :出了队列的消息 可以理解为是消费这消费掉的数量。
2. JMS入门小Demo
2.1点对点模式
点对点的模式主要建立在一个队列上面,当连接一个列队的时候,发送端不需要知道接收端是否正在接收,可以直接向ActiveMQ发送消息,发送的消息,将会先进入队列中,如果有接收端在监听,则会发向接收端,如果没有接收端接收,则会保存在activemq服务器,直到接收端接收消息,点对点的消息模式可以有多个发送端,多个接收端,但是一条消息,只会被一个接收端给接收到,哪个接收端先连上ActiveMQ,则会先接收到,而后来的接收端则接收不到那条消息。
2.1.1消息生产者
(1)创建工程jmsDemo ,引入依赖
|
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.13.4</version> </dependency> |
(2)创建类QueueProducer main方法代码如下:
|
//1.创建连接工厂 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616"); //2.获取连接 Connection connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); //4.获取session (参数1:是否启动事务,参数2:消息确认模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建队列对象 Queue queue = session.createQueue("test-queue"); //6.创建消息生产者 MessageProducer producer = session.createProducer(queue); //7.创建消息 TextMessage textMessage = session.createTextMessage("欢迎来到神奇的品优购世界"); //8.发送消息 producer.send(textMessage); //9.关闭资源 producer.close(); session.close(); connection.close(); |
上述代码中第4步创建session 的两个参数:
第1个参数 是否使用事务
第2个参数 消息的确认模式
·AUTO_ACKNOWLEDGE = 1 自动确认
·CLIENT_ACKNOWLEDGE = 2 客户端手动确认
·DUPS_OK_ACKNOWLEDGE = 3 自动批量确认
·SESSION_TRANSACTED = 0 事务提交并确认
运行后通过ActiveMQ管理界面查询
2.1.2消息消费者
创建类QueueConsumer ,main方法代码如下:
|
//1.创建连接工厂 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616"); //2.获取连接 Connection connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); //4.获取session (参数1:是否启动事务,参数2:消息确认模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建队列对象 Queue queue = session.createQueue("test-queue"); //6.创建消息消费 MessageConsumer consumer = session.createConsumer(queue);
//7.监听消息 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage=(TextMessage)message; try { System.out.println("接收到消息:"+textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //8.等待键盘输入 System.in.read(); //9.关闭资源 consumer.close(); session.close(); connection.close(); |
执行后看到控制台输出
2.1.3运行测试
同时开启2个以上的消费者,再次运行生产者,观察每个消费者控制台的输出,会发现只有一个消费者会接收到消息。
2.2 发布/订阅模式
2.2.1消息生产者
创建类TopicProducer ,main方法代码如下:
|
//1.创建连接工厂 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616"); //2.获取连接 Connection connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); //4.获取session (参数1:是否启动事务,参数2:消息确认模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建主题对象 Topic topic = session.createTopic("test-topic"); //6.创建消息生产者 MessageProducer producer = session.createProducer(topic); //7.创建消息 TextMessage textMessage = session.createTextMessage("欢迎来到神奇的品优购世界"); //8.发送消息 producer.send(textMessage); //9.关闭资源 producer.close(); session.close(); connection.close(); |
运行效果如下:
2.2.2消息消费者
创建类TopicConsumer ,main方法代码如下:
|
//1.创建连接工厂 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616"); //2.获取连接 Connection connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); //4.获取session (参数1:是否启动事务,参数2:消息确认模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建主题对象 //Queue queue = session.createQueue("test-queue"); Topic topic = session.createTopic("test-topic"); //6.创建消息消费 MessageConsumer consumer = session.createConsumer(topic);
//7.监听消息 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage=(TextMessage)message; try { System.out.println("接收到消息:"+textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //8.等待键盘输入 System.in.read(); //9.关闭资源 consumer.close(); session.close(); connection.close(); |
2.2.3运行测试
同时开启2个以上的消费者,再次运行生产者,观察每个消费者控制台的输出,会发现每个消费者会接收到消息。
记得点赞哦