1. 消息中间件
平时工作中一直使用到activeMQ,但是一直没有机会进行总结,现在由于公司业务需求,需要将消息中间键由actveMQ变成RocketMQ,现在对消息中间键进行简单的整理.
2. 什么是MOM(面向消息中间键)?
MOM(Message-oriented middleware)就是面向消息中间件,是用于以分布式应用或系统中的异步,松耦合,可靠,可扩展和安全通信的一类软件.MOM总体思想是它作为消息发送器和消息接收器之间的消息中介,这种中介提供了一个全新水平的松耦合.
目前在生产环境常用的消息队列有:
ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。常用对比如下:
3. 什么是activeMQ?
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范(JMS规范)的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
4. activeMQ 两种模式.
-
点对点(P2P) : 即每产生一个消息并发送,只有一个消息消费者和其一一对应.
点对点模式主要建立在一个队列上边,当连接到一个队列的时候,发送端不需要知道接收端是否正在接收,可以直接向ActiveMQ发送消息,发送的消息首先进入队列中,如果有接收端在监听,则会发送给接收端,否则保存在activeMQ服务器,知道发现接收端接收消息.点对点可以有多个发送端和多个接收端,但是一个消息只能被一个接收端消费.
-
发布订阅(Pub/Sub): 即每产生一个消息并发送,可以有多个消费者对消息进行接收和消费.
发布/订阅模式,同样可以有多个发送端和多个接收端,但是接收端如果没有监听消息,那么activeMQ默认不会保存信息,将会认为消息已经发送出去了. 发送端发送的消息如果接收端不在线,收不到消息,即使以后连接上线也不会接收到已经发送的消息, 发布订阅模式中,activeMQ不对保存消息,如果需要保存消息要进一步进行设置.
5.activeMQ使用.
- 生产者:发送消息,发送端
第一步: 创建ConnectionFactory,需要指定服务端ip以及端口号.
第二步:使用ConncetionFactory对象创建一个Connection对象.
第三步:开启连接,调用activeMQ的start方法.
第四步:使用Connection对象创建一个session对象.
第五步:使用session创建一个Destination对象(topic,queue),此外创建一个Queue对象.
第六步: 使用Session 创建一个Producer对象.
第七步:创建一个Message对象,创建一个TextMessage对象.
第八步: 使用Productor对象发送消息.
第九步: 关闭资源.
public void testQueueProducer() throws Exception {
// 第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
//brokerURL服务器的ip及端口号
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
// 第二步:使用ConnectionFactory对象创建一个Connection对象。
Connection connection = connectionFactory.createConnection();
// 第三步:开启连接,调用Connection对象的start方法。
connection.start();
// 第四步:使用Connection对象创建一个Session对象。
//第一个参数:是否开启事务。true:开启事务,第二个参数忽略。
//第二个参数:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。
//参数:队列的名称。
Queue queue = session.createQueue("test-queue");
// 第六步:使用Session对象创建一个Producer对象。
MessageProducer producer = session.createProducer(queue);
// 第七步:创建一个Message对象,创建一个TextMessage对象。
/*TextMessage message = new ActiveMQTextMessage();
message.setText("hello activeMq,this is my first test.");*/
TextMessage textMessage = session.createTextMessage("hello activeMq,我要发送第一个信息了......");
// 第八步:使用Producer对象发送消息。
producer.send(textMessage);
// 第九步:关闭资源。
producer.close();
session.close();
connection.close();
}
- 消费者:接收消息,消费消息
第一步:创建一个ConncetionFactory对象.
第二步:Connection 对象创建一个Connection对象.
第三步:Connection对象调用start方法开启连接.
第四步:使用 Connection对象创建一个Session对象.
第五步: 使用Connection对象创建一个Destination对象,和发送端保持一致,其中队列Queue的名字也要保持一致.
第六步:使用session创建一个Consumer对象.
第七步:接收消息.
第八步:打印消息.
第九步:关闭资源.
public void testQueueConsumer() throws Exception {
// 第一步:创建一个ConnectionFactory对象。
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
// 第二步:从ConnectionFactory对象中获得一个Connection对象。
Connection connection = connectionFactory.createConnection();
// 第三步:开启连接。调用Connection对象的start方法。
connection.start();
// 第四步:使用Connection对象创建一个Session对象。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。
Queue queue = session.createQueue("test-queue");
// 第六步:使用Session对象创建一个Consumer对象。
MessageConsumer consumer = session.createConsumer(queue);
// 第七步:接收消息。
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
String text = null;
//取消息的内容
text = textMessage.getText();
// 第八步:打印消息。
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//等待键盘输入
//这里因为是DEMO的原因,所以需要在这里等待键盘输入,也就是进行阻塞,但是在spring框架中使用activeMQ,因为有框架持有.
System.in.read();
// 第九步:关闭资源
consumer.close();
session.close();
connection.close();
}
消息中间件的优缺点总结:
-
ActiveMQ:
非常成熟,功能强大,在业内大量的公司以及项目中都有应用
偶尔会有较低概率丢失消息而且现在社区以及国内应用都越来越少,官方社区现在对ActiveMQ 5.x维护越来越少,几个月才发布一个版本而且确实主要是基于解耦和异步来用的,较少在大规模吞吐的场景中使用. -
RabbitMQ:
erlang语言开发,性能极其好,延时很低;
吞吐量到万级,MQ功能比较完备
而且开源提供的管理界面非常棒,用起来很好用
社区相对比较活跃,几乎每个月都发布几个版本分
在国内一些互联网公司近几年用rabbitmq也比较多一些
但是问题也是显而易见的,RabbitMQ确实吞吐量会低一些,这是因为他做的实现机制比较重。
优点:
- 在高吞吐,高可用上较弱.
- 支持多种客户端语言.
- 由于erlang语言的特性,性能也比较好,使用RAM模式时,性能很好.
- 管理界面丰富,在互联网公司有大规模应用.
缺点:
- erlang语言难度较大,集群不支持动态扩展.
- 不支持事务,消息吞吐能力有限.
- 消息堆积是,性能明显降低.
- RocketMQ:
接口简单易用,而且毕竟在阿里大规模应用过,有阿里品牌保障
日处理消息上百亿之多,可以做到大规模吞吐,性能也非常好,分布式扩展也很方便,社区维护还可以,可靠性和可用性都是ok的,还可以支撑大规模的topic数量,支持复杂MQ业务场景而且一个很大的优势在于,阿里出品都是java系的,我们可以自己阅读源码,定制自己公司的MQ,可以掌控.
优点:
- 在高吞吐量,低延迟,高可用上有非常好的表现,消息堆积时性能也很好.
- api,系统设计都更适合在业务处理场景.
- 支持多种消费方式.
- 支持broker消息过滤.
- 支持事物.
- 提供消息顺序消费能力:consumer可以水平扩展,消费能力强.
- 集群模式规模在50台左右,单日处理消息上百亿.
缺点:
- 相比于kafka,使用者少,生态不够完善.
- 不支持主从自动切换.
- 客户端只支持java.
- Kafka:
kafka的特点其实很明显,就是仅仅提供较少的核心功能,但是提供超高的吞吐量,ms级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展
同时kafka最好是支撑较少的topic数量即可,保证其超高吞吐量而且kafka唯一的一点劣势是有可能消息重复消费,那么对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略.
这个特性天然适合大数据实时计算以及日志收集.
优点:
- 在高吞吐,低延迟,高可用集群热扩展集群容错上有非常好的表现.
- producer端提供缓存,压缩功能,节省性能,提高效率.
- 提供顺序消费能力.
- 提供多种客户端语言.
- 生态完善.在大数据处理有噢诶套设施.
缺点: - 消费集群数目受到分区数目的限制.
- 单机topic多时,性能明显降低.
- 不支持事务.