https://www.rabbitmq.com/tutorials/amqp-concepts.html
1 什么是AMQP 0-9-1?
AMQP 0-9-1 (Advanced Message Queuing Protocol) is a messaging protocol that enables conforming client applications to communicate with conforming messaging middleware brokers.
Messaging brokers receive messages from publishers (applications that publish them, also known as producers) and route them to consumers (applications that process them).
Since it is a network protocol, the publishers, consumers and the broker can all reside on different machines.
3 amqp 模型:
The AMQP 0-9-1 Model has the following view of the world: messages are published to exchanges, which are often compared to post offices or mailboxes. Exchanges then distribute message copies to queues using rules called bindings. Then the broker either deliver messages to consumers subscribed to queues, or consumers fetch/pull messages from queues on demand.
这个过程中,因为网络可能有问题,所以消息不一定可达,所以出现了发送确认和消费确认机制,后续介绍。
1 )生产者 或者发布消息者
业务逻辑如下:
1 配置连接工厂
2 建立TCP连接
3 在TCP连接的基础上创建通道
4 声明一个队列
5 发送消息
生产者和消费者是使用TCP和RabbitMQ建立长连接。但是TCP是非常耗费资源,为了减少TCP连接的数据,RabbitMQ提出通道的概念。在同一个TCP连接上可以建立多个通道,即可以把通道理解成共享一个TCP连接的多个轻量化连接。通道不是线程安全的,不推荐多个线程共用一个通道。在多线程/进程应用中,为每个线程/进程开启一个通道(channel),并且这些通道不能被线程/进程共享。
一个特定通道上的通讯与其他通道上的通讯是完全隔离的,因此每个AMQP方法都需要携带一个通道号,这样客户端就可以指定此方法是为哪个通道准备的。
// 配置连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setUsername(userName);
factory.setPassword(password);
Connection connection = null;
Channel channel = null;
// 建立TCP连接
connection = factory.newConnection();
// 在TCP连接的基础上创建通道
channel = connection.createChannel();
生产者把消息发送到了exchange
2) 交换机
交换机拿到一个消息之后将它路由给一个或零个队列。它使用哪种路由算法是由交换机类型和被称作绑定(bindings)的规则所决定的。RabbitMQ的提供了四种交换机
Direct exchange(直连交换机)
Fanout exchange(扇型交换机)
Topic exchange(主题交换机)
Headers exchange(头交换机)
我的例子里只使用了默认交换机:默认交换机(default exchange)实际上是一个由消息代理预先声明好的名字为空字符串的直连交换机(direct exchange)。它有一个特殊的属性使得它对于简单应用特别有用处:那就是每个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同。
其他的交换机,
交换机声明的时候可以带有其他属性,主要的如下
- Name
- Durability (exchanges survive broker restart)
- Auto-delete (exchange is deleted when last queue is unbound from it)
- Arguments (optional, used by plugins and broker-specific features)
3) 绑定关系
Bindings are rules that exchanges use (among other things) to route messages to queues. To instruct an exchange E to route messages to a queue Q, Q has to be bound to E. Bindings may have an optional routing key attribute used by some exchange types. The purpose of the routing key is to select certain messages published to an exchange to be routed to the bound queue. In other words, the routing key acts like a filter.
To draw an analogy:
- Queue is like your destination in New York city
- Exchange is like JFK airport
- Bindings are routes from JFK to your destination. There can be zero or many ways to reach it
Having this layer of indirection enables routing scenarios that are impossible or very hard to implement using publishing directly to queues and also eliminates certain amount of duplicated work application developers have to do.
If AMQP message cannot be routed to any queue (for example, because there are no bindings for the exchange it was published to) it is either dropped or returned to the publisher, depending on message attributes the publisher has set.
4) 队列
用于存储即将要发送的消息(message)。它的大小理论上是无限的,唯一限制大小是RabbitMQ的内存和磁盘大小.队列在声明(declare)后才能被使用。声明队列操作是幂等操作,即如果一个队列尚不存在,声明一个队列会创建它,如果声明的队列已经存在,并且属性完全相同,那么此次声明不会对原有队列产生任何影响,如果声明中的属性与已存在队列的属性有差异,那么一个错误代码为406的通道级异常就会被抛出。
创建队列方法:
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException
队列配置参数说明:
第一个参数queue
定义队列的名称,最多255字节的一个utf-8字符串。如果此参数为空字符串,则RabbitMQ会自动生成一个唯一的队列名,在同一个通道的后续的方法中,我们可以使用空字符串来表示之前生成的队列名称,之所以之后的方法可以获取正确的队列名是因为通道可以默默地记住消息代理最后一次生成的队列名称
以”amq.”开始的队列名称被预留做消息代理内部使用,不能被应用使用,否则抛出403 (ACCESS_REFUSED)错误
第二个参数 durable
是否持久化,如果true,则此种队列叫持久化队列(Durable queues)。此队列会被存储在磁盘上,当消息代理(broker)重启的时候,它依旧存在。没有被持久化的队列称作暂存队列(Transient queues)。
持久化的队列并不会使得路由到它的消息也具有持久性。倘若消息代理挂掉了,重新启动,那么在重启的过程中持久化队列会被重新声明,无论怎样,只有经过持久化的消息才能被重新恢复。
第三个参数 execulusive
表示此对应只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参数的优先级高于durable
第四个参数 autoDelete
当没有生成者/消费者使用此队列时,此队列会被自动删除。
(即当最后一个消费者退订后即被删除)
第五个参数 arguments
其它的扩展属性,如一些消息代理用他来完成类似与TTL功能时相关的参数
声明一个队列代码如下:
channel.queueDeclare(QUEUE_NAME+4, false, false, false, null);
备注:还有一些特殊队列
临时队列:等价于建立一个durable=false,execulusive=true,autoDelete=true,名称为随机的队列
内部队列:以”amq.”开始的队列名称被预留做消息代理内部使用。如果试图在队列声明时打破这一规则的话,一个通道级的403 (ACCESS_REFUSED)错误会被抛出
5) 发送消息和消息属性
发送消息方法 void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
详细参数说明:
第一个参数exchange:
执行队列使用的交换机(exchange),交换机拿到一个消息之后将它路由给一个或零个队列。关于这块的内容后面会详细说明。如果值为空字符串,则表示使用默认交换机。默认交换机(default exchange)实际上是一个由消息代理预先声明好的没有名字(名字为空字符串)的直连交换机(direct exchange)。它有一个特殊的属性使得它对于简单应用特别有用处:那就是每个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同。
第二个参数routingKey:
指定路由键,如果使用默认交换机,则此值和队列名称相同
第三个参数props:
除了Routing key外,消息还可以配置如下属性,常用属性如下:
○ Content type:内容类型
○ Content encoding:内容编码
○ headers; 消息的头信息,类似http协议中的header属性
○ Delivery mode (persistent or not) 投递模式(持久化 或 非持久化): 如果此值是persistent ,则此消息存储在磁盘上。如果服务器重启,系统会保证收到的持久化消息未丢失,将消息以持久化方式发布时,会对性能造成一定的影响
○ Message priority:消息优先级
○ String correlationId;
○ ReplyTo; 反馈队列
○ Expiration period: 消息有效期
○ String messageId;
○ Message publishing timestamp:消息发送的时间戳
○ String type;
○ String userId;
○ Publisher application id:发布应用的ID
○ String clusterId;
第四个参数body:
消息的有效负载pPayload(消息实际携带的数据),它被RabbitMQ当作不透明的字节数组来对待。消息代理不会检查或者修改有效载荷。消息可以只包含属性而不携带有效载荷。
发送消息代码如下
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
6)消费者
消费者消费消息有2种方式
1 是订阅队列,然后等待服务器推送消息
2 每次需要的时候主动从队列获取消息
1 主动获取比较简单,是通过channel.basicGet方法实现。
2 订阅实现的方式是 通过com.rabbitmq.client.Consumer定义回调方法。回调接口用于处理订阅的消息。DefaultConsumer是此接口的默认,官方也推荐如果要实现自己的Consumer,则继承此类
// 默认消费者实现
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [HelloworldRecv] Received '" + message + "'");
}
};
接收消息方法
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
详细参数说明:
第一个参数queue
要接收消息的队列名称
第二个参数autoAck
消费者在处理消息的时候偶尔会失败或者有时会直接崩溃掉,而且网络原因也有可能引起各种问题。为了解决这个问题,RabbitMQ给我们两种建议:
○ 自动确认模式(automatic acknowledgement model):当RabbitMQ将消息发送给消费者后,立即从内存中删除消息。
○ 显式确认模式(explicit acknowledgement model):只有消费者发送确认回执(acknowledgement),RabbitMQ才删除消息。如果一个消费者在尚未发送确认回执的情况下挂掉了,那RabbitMQ会将消息重新投递给另一个消费者。如果当时没有可用的消费者了,RabbitMQ会死等下一个注册到此队列的消费者,然后再次尝试投递。
第三个参数callback
指定处理消息的回调类
接收消息代码如下
// 接收消息
channel.basicConsume(QUEUE_NAME, true, consumer);
7) 消息确认方式
消费者在处理消息的时候偶尔会失败或者有时会直接崩溃掉。而且网络原因也有可能引起各种问题,对于此AMQP有两种处理方式:
○ 自动确认模式(automatic acknowledgement model):当RabbbitMQ将消息发送给应用后,消费者端自动回送一个确认消息,此时RabbitMQ删除此消息。
○ 显式确认模式(explicit acknowledgement model):消费者收到消息后,可以在执行一些逻辑后,消费者自己决定什么时候发送确认回执(acknowledgement),RabbitMQ收到回执后才删除消息,这样就保证消费端不会丢失消息
如果一个消费者在尚未发送确认回执的情况下挂掉了,那么消息会被重新放入队列,并且在还有其他消费者存在于此队列的前提下,立即投递给另外一个消费者。如果当时没有可用的消费者了,消息代理会死等下一个注册到此队列的消费者,然后再次尝试投递。
RabbitMQ里的消息是不会过期。当消费者挂掉后,RabbitMQ会不断尝试重推。所有单个消息的推送可能花费很长的时间
是否开启自动确认模式由以下方法的autoAck属性决定
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
代码实现
a 自动确认模式:只需要设置此值为autoAck为true即可,可以参考上一篇文章的用法
b. 显示确认模式:
a. channel.basicConsume()第二个参数autoAck值为false
b. 收到消息后,必须调用 channel.basicAck 向rabbitMQ发送确认回执
// 默认消费者实现
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [WorkQueuesRecv-" +id+ "] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [WorkQueuesRecv-" +id+ "] Done");
// 情况一:对处理好的消息进行应答
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 获取消息:
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
消息确认方法:
void basicAck(long deliveryTag, boolean multiple) throws IOException;
方法详细参数如下:
第一个参数deliveryTag:发布的每一条消息都会获得一个唯一的deliveryTag,(任何channel上发布的第一条消息的deliveryTag为1,此后的每一条消息都会加1),deliveryTag在channel范围内是唯一的
第二个参数multiple:批量确认标志。如果值为true,则执行批量确认,此deliveryTag之前收到的消息全部进行确认; 如果值为false,则只对当前收到的消息进行确认
备注:
如果在获取消息时采用不自动应答,但是获取消息后不调用basicAck,则后果会很严重。RabbitMQ会认为消息没有投递成功,不仅所有的消息都会保留到内存中,而且在客户重新连接后,会将所有的消息重新投递一遍
8)拒绝消息
当消费者接收到某条消息后,处理过程有可能失败,这时消费者可以拒绝此消息。在拒绝消息时,消费者会告诉RabbitMQ如何处理这条消息:销毁它或者重新放入队列。
可以有两种方式拒绝此消息
a. channel.basicReject:只支持对一条消息进行拒绝
拒绝方法:
void basicReject(long deliveryTag, boolean requeue) throws IOException;
1
方法详细参数如下:
第一个参数deliveryTag:发布的每一条消息都会获得一个唯一的deliveryTag,deliveryTag在channel范围内是唯一的
第二个参数requeue:表示如何处理这条消息,如果值为true,则重新放入RabbitMQ的发送队列,如果值为false,则通知RabbitMQ销毁这条消息
代码台下:
channel.basicReject(envelope.getDeliveryTag(), true);
b. channel.basicNack
channel.basicNack是 channel.basicReject的补充,提供一次对多条消息进行拒绝的功能
方法如下:
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
方法参数详细如下:
第一个参数deliveryTag:发布的每一条消息都会获得一个唯一的deliveryTag,deliveryTag在channel范围内是唯一的
第二个参数multiple:批量确认标志。如果值为true,包含本条消息在内的、所有比该消息deliveryTag值小的 消息都被拒绝了(除了已经被 ack 的以外);如果值为false,只拒绝三本条消息
第三个参数requeue:表示如何处理这条消息,如果值为true,则重新放入RabbitMQ的发送队列,如果值为false,则通知RabbitMQ销毁这条消息
代码如下
channel.basicNack(envelope.getDeliveryTag(), false, false);
备注
当此队列只有一个消费者时,请确认不要由于拒绝消息并且选择了重新放入队列的行为而引起消息在同一个消费者身上无限循环的情况发生。
9) 预取消息数量设置
默认情况下,RabbitMQ收到消息后,就向消费者推送。但是如果消息过多,且消息的数量超过了消费者处理能力从而导致其崩溃。此时我们可以通过prefetchCount 限制每个消费者在收到下一个确认回执前一次可以最大接受多少条消息。即如果设置prefetchCount =1,RabbitMQ向这个消费者发送一个消息后,再这个消息的消费者对这个消息进行ack之前,RabbitMQ不会向这个消费者发送新的消息
代码如下
// 每个客户端每次最多获取N个消息
channel.basicQos(n);
10)持久化
为了保证消息的可靠性,需要对消息进行持久化。
为了保证RabbitMQ在重启、奔溃等异常情况下数据没有丢失,除了对消息本身持久化为,还需要将消息传输经过的队列(queue),交互机进行持久化(exchange),持久化以上元素后,消息才算真正RabbitMQ重启不会丢失。
消息持久化
方法:
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
第三个参数props:设置投递模式为持久化,如果此值是persistent ,则此消息存储在磁盘上。如果服务器重启,系统会保证收到的持久化消息未丢失,将消息以持久化方式发布时,会对性能造成一定的影响
消息持久化代码如下:
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
队列持久化
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
详细参数如下:
第二个参数 durable
是否持久化,如果true,则此种队列叫持久化队列(Durable queues)。此队列会被存储在磁盘上,当消息代理(broker)重启的时候,它依旧存在。没有被持久化的队列称作暂存队列(Transient queues)。
第三个参数 execulusive
表示此对应只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
第四个参数 autoDelete
当没有生成者/消费者使用此队列时,此队列会被自动删除。
(即当最后一个消费者退订后即被删除)
代码如下:
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
交换机持久化
以下是声明交换机的方法:
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;
方法参数说明:
第三个参数durable:交换机是否持久化
重载函数
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,
Map<String, Object> arguments) throws IOException
11) 消息分配策略
多个消费者同时消费同一个队列,Rabbit的消息的分配策略是什么?
如果同一个队列,有多个消费者消费这个队列。RabbitMQ默认是按照轮询的策略发送消息,即发送的顺序是消费者1,消费者2,消费者1,消费者2…。所以平均下来,每个消费者消费的消息数量几乎相同。
12)vhost
To make it possible for a single broker to host multiple isolated "environments" (groups of users, exchanges, queues and so on), AMQP includes the concept of virtual hosts (vhosts). They are similar to virtual hosts used by many popular Web servers and provide completely isolated environments in which AMQP entities live. AMQP clients specify what vhosts they want to use during AMQP connection negotiation.