1、什么是MQ
2、MQ的应用场景
3、ActiveMQ的使用方法。
4、使用消息队列实现商品同步。
2. 同步索引库分析
方案一:在taotao-manager中,添加商品的业务逻辑中,添加一个同步索引库的业务逻辑。
缺点:业务逻辑耦合度非常高,业务拆分不明确
方案二:业务逻辑在taotao-search中实现,调用服务在taotao-manager实现。业务逻辑分开。
缺点:
服务之间的耦合度变高,启动有先后顺序。
随着调用的服务会越来越多,服务之间的调用越来越复杂,难以管理。
方案三:使用消息队列。
存在的问题:
1、如果MQ挂了,所有相关的服务都挂了
2、MQ有性能的瓶颈,尽量减少消息的内容的大小
技术的选型和具体的业务有关,只选择合适的技术。
如果MQ挂了:
1.通过日志查找原因
2.通知相关的人员修复
3.关键的业务必须保证有备用方案
3. ActiveMQ
MQ是一个消息中间件,比如:ActiveMQ、RabbitMQ、kafka都属于MQ,是MQ的产品。
什么是消息中间件?
消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有Producer(生产者)、Consumer(消费者)
3.1. 什么是ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
主要特点:
- 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
- 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)。
- 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性。
- 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上。
- 支持多种传送协议:in-VM、TCP、SSL、NIO、UDP、JGroups、JXTA。
- 支持通过JDBC和journal提供高速的消息持久化。
- 从设计上保证了高性能的集群,客户端-服务器,点对点。
- 支持Ajax。
- 支持与Axis的整合。
- 可以很容易得调用内嵌JMS provider,进行测试。
什么是JMS?
JMS(Java Messaging Service)是Java平台上有关面向消息中间件的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发。
JMS本身只定义了一系列的接口规范,是一种与厂商无关的 API,用来访问消息收发系统。
消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。
JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性:
- StreamMessage : Java原始值的数据流
- MapMessage :一套名称-值对
- TextMessage :一个字符串对象
- ObjectMessage :一个序列化的 Java对象
- BytesMessage :一个字节的数据流
3.2. ActiveMQ的消息形式
对于消息的传递有两种类型:
- 一种是点对点的,即一个生产者和一个消费者一一对应;
- 另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。
3.3. ActiveMQ的安装
第一步: 把ActiveMQ 的压缩包上传到Linux系统。
Alt+p打开sftp窗口:输入put "F:/java/ziyuan/apache-activemq-5.13.0-bin.tar.gz"
第二步:解压缩: tar zxvf apache-activemq-5.13.0-bin.tar.gz
第三步:启动:使用bin目录下的activemq命令
启动:[root@localhost bin]# ./activemq start
关闭:[root@localhost bin]# ./activemq stop
查看状态:[root@localhost bin]# ./activemq status
进入管理后台:
http://192.168.25.130:8161/admin
用户名:admin
密码:admin
4. ActiveMQ的使用方法
点对点:(Queue)
发布/订阅:(Topic)
下面使用其他的工程来学习:
工程需要添加jar包:
4.1.点对点(Queue)
4.1.1. 生产者(Producer):生产消息,发送端。
第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
第二步:使用ConnectionFactory对象创建一个Connection对象。
第三步:开启连接,调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。
第六步:使用Session对象创建一个Producer对象。
第七步:创建一个Message对象,创建一个TextMessage对象。
第八步:使用Producer对象发送消息。
第九步:关闭资源。
public class QueueProducer { //生产者发送消息 @Test public void send() throws Exception{ //1.创建一个连接工厂Connectionfactory, 参数:就是要连接的服务器的地址 ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.25.132:61616"); //2.通过工厂获取连接对象 创建连接 Connection connection = factory.createConnection(); //3.开启连接 connection.start(); //4.创建一个session对象 提供发送消息等方法 //第一个参数:表示是否开启分布式事务(JTA) 一般是false 不开启。 //第二个参数:就是设置消息的应答模式. 如果第一个参数为false时,第二个参数设置才有意义。用的是自动应答 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建目的地 queue, 参数:目的地的名称 Queue queue = session.createQueue("queue-test"); //6.创建个生产者 MessageProducer producer = session.createProducer(queue); //7.构建消息的内容 TextMessage textMessage = session.createTextMessage("queue测试发送的消息"); // TextMessage message = session.createTextMessage(); // message.setText("queue测试发送的消息"); //8.发送消息 producer.send(textMessage); //9.关闭资源 producer.close(); session.close(); connection.close(); } }
4.1.2. 消费者(Consumer):接收消息。
第一步:创建一个ConnectionFactory对象。
第二步:从ConnectionFactory对象中获得一个Connection对象。
第三步:开启连接。调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。
第六步:使用Session对象创建一个Consumer对象。
第七步:接收消息。
第八步:打印消息。
第九步:关闭资源
public class QueueCustomer { @Test public void recieve() throws Exception { //1.创建连接的工厂 ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.25.132:61616"); //2.创建连接 Connection connection = factory.createConnection(); //3.开启连接 connection.start(); //4.创建session //第一个参数:表示是否开启分布式事务(JTA) 一般是false 不开启。 //第二个参数:就是设置消息的应答模式 如果 第一个参数为false时,第二个参数设置才有意义。用的是自动应答 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建接收消息的一个目的地 Queue queue = session.createQueue("queue-test"); //6.创建消费者 MessageConsumer consumer = session.createConsumer(queue); //7.接收消息 打印 //-------------第一种 /*while(true){ Message message = consumer.receive(1000000);//设置接收消息的超时时间 //没有接收到消息就跳出循环 if(message==null){ break; } if(message instanceof TextMessage){ TextMessage message2 = (TextMessage) message; System.out.println("接收的消息为"+message2.getText()); } }*/ //-------------第二种 //设置一个监听器,这里其实开辟了一个新的线程 consumer.setMessageListener(new MessageListener() { //当有消息的时候会执行以下的逻辑 @Override public void onMessage(Message message) { if(message instanceof TextMessage){ TextMessage message2 = (TextMessage) message; try { System.out.println("接收的消息为"+message2.getText()); } catch (JMSException e) { e.printStackTrace(); } } } }); Thread.sleep(199999); //8.关闭资源 consumer.close(); session.close(); connection.close(); } }
测试结果:
Number Of Pending Messages :等待消费的消息 这个是当前未出队列的数量。
Number Of Consumers :消费者 这个是消费者端的消费者数量
Messages Enqueued :进入队列的消息 进入队列的总数量,包括出队列的。
Messages Dequeued :出了队列的消息 可以理解为是消费这消费掉的数量。
4.2. 发布/订阅(Topic)
4.2.1. 生产者(Producer):生产消息,发送端
第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
第二步:使用ConnectionFactory对象创建一个Connection对象。
第三步:开启连接,调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Topic对象。
第六步:使用Session对象创建一个Producer对象。
第七步:创建一个Message对象,创建一个TextMessage对象。
第八步:使用Producer对象发送消息。
第九步:关闭资源。
public class TopicProducer { // 发送topic @Test public void send() throws Exception { //1.创建连接工厂 ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.25.132:61616"); //2.创建连接 Connection connection = factory.createConnection(); //3.开启连接 connection.start(); //4.创建session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建目的地 topic Topic createTopic = session.createTopic("topic-test"); //6.创建生成者 MessageProducer producer = session.createProducer(createTopic); //7.构建消息对象 TextMessage createTextMessage = session.createTextMessage("topic发送的消息123"); //8.发送消息 producer.send(createTextMessage); //9.关闭资源 producer.close(); session.close(); connection.close(); } }
4.2.2. 消费者(Consumer):接收消息。
第一步:创建一个ConnectionFactory对象。
第二步:从ConnectionFactory对象中获得一个Connection对象。
第三步:开启连接。调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象。和发送端保持一致topic,并且话题的名称一致。
第六步:使用Session对象创建一个Consumer对象。
第七步:接收消息。
第八步:打印消息。
第九步:关闭资源
以TopicCustomer1为例:
public class TopicCustomer1 { @Test public void reieve() throws Exception { // 1.创建连接的工厂 指定MQ服务器的地址 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.132:61616"); // 2.获取连接 Connection connection = connectionFactory.createConnection(); // 3.开启连接 connection.start(); // 4.根据连接对象创建session (提供了操作activmq的方法) //第一个参数:表示是否开启分布式事务(JTA) 一般就是false :表示不开启。 只有设置了false ,第二个参数才有意义。 //第二个参数:表示设置应答模式 自动应答和手动应答 。使用的是自动应答 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5.根据session创建目的地(destination) Topic topic = session.createTopic("topic-test"); // 6.创建消费者; MessageConsumer consumer = session.createConsumer(topic); // 7.接收消息 // 第一种接收消息.直接接收 只是测试的使用 /* * while(true){ //设置接收消息的超时时间 单位是毫秒 Message receive = * consumer.receive(3000000); * * if(receive==null){ break; } * * //取消息 if(receive instanceof TextMessage){ TextMessage message = * (TextMessage)receive; String text = message.getText();//获取消息的内容 * System.out.println(text); } } */ // 第二种接收消息.设置一个监听器 就是开启了一个新的线程 System.out.println("start"); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if (message instanceof TextMessage) { TextMessage message2 = (TextMessage) message; String text = ""; try { text = message2.getText(); } catch (JMSException e) { e.printStackTrace(); } // 获取消息的内容 System.out.println(text); } System.out.println(); } }); System.out.println("end"); // 睡眠 Thread.sleep(10000000); // 9.关闭资源 consumer.close(); session.close(); connection.close(); } }
4.3. 小结
queue 是点对点模式,只能是一个生产者产生一个消息,被一个消费者消费。
topic 是发布订阅模式,一个生产者可以一个消息,可以被多个消费者消费。
queue 默认是存在于MQ的服务器中的,发送消息之后,消费者随时取。但是一定是一个消费者取,消费完消息也就没有了。
topic 默认是不存在于MQ服务器中的,一旦发送之后,如果没有订阅,消息则丢失。
5. Activemq整合spring
5.1. 配置
第一步:引用相关的jar包。
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> </dependency>