rabbitMq有四种类型的交换机fanout、direct、topic、headers
实际在RabbitMQ中,生产者将消息发送到Exchange(交换器,下图中的X),由Exchange将消息路由到一个或多个Queue中(或者丢弃)。
Exchange是按照什么逻辑将消息路由到Queue的?。
RabbitMQ中的Exchange有四种类型,不同的类型有着不同的路由策略,。有四种类型的交换机fanout、direct、topic、headers
生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。
在Exchange Type与binding key固定的情况下(在正常使用时一般这些内容都是固定配置好的),我们的生产者就可以在发送消息给Exchange时,通过指定routing key来决定消息流向哪里。RabbitMQ为routing key设定的长度限制为255 bytes。
一 fanout类型的交换机(发布订阅模式)
fanout类型交换机会将接收到的消息广播给所有与之绑定的队列。
package com.example.rabbit.web.test.sendandexchange;
import com.example.rabbit.utils.ConnectionFactoryUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
/**
* ***GOOD LUCK****
*
* @Author : Wukn
* @Date : 2018/9/30
*
* 生产者
*/
public class SendExchangeService {
/**
* 队列名
*/
public String QUEUE_NAME2 = "test_simple_queue";
/**
* 交换机名
*/
public String QUEUE_EXANGE_NAME2 = "test_simple_queue_exchange";
/**
* 生产者
*/
@Test
public void test01() {
try {
//获取连接
Connection connection = ConnectionFactoryUtil.getConnection( );
//获取通道
Channel channel = connection.createChannel();
//申明交换机
channel.exchangeDeclare(QUEUE_EXANGE_NAME2,"fanout" );
//创建队列申明
//channel.queueDeclare(QUEUE_NAME2,false,false,false,null );
//每个消费者发送确认消息之前只发一个,不发送下一个
// channel.basicQos( 1 );
for (int i = 0;i<30;++i) {
String message = "消息---"+i;
//发送消息到交换机
channel.basicPublish( "",QUEUE_EXANGE_NAME2,null,message.getBytes() );
System.out.println("发送消息。。。"+message);
}
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace( );
}
}
}
消费者:
package com.example.rabbit.web.test.exchange.customerexchange;
import com.example.rabbit.utils.ConnectionFactoryUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.QueueingConsumer;
import org.junit.Test;
/**
* ***GOOD LUCK****
*
* @Author : Wukn
* @Date : 2018/9/30
*/
public class CustomerExchangeOne {
/**
* 队列么名
*/
public String QUEUE_NAME2 = "Emails";
/**
* 交换机名
*/
public String QUEUE_EXANGE_NAME2 = "test_simple_queue_exchange";
/**
* 消费消息2
*/
@Test
public void test033() {
try {
//获取连接
Connection connection = ConnectionFactoryUtil.getConnection( );
//获取通道
Channel channel = connection.createChannel();
//通过channel指定队列
channel.queueDeclare(QUEUE_NAME2,false,false,false,null );
//交换机类型
channel.exchangeDeclare(QUEUE_EXANGE_NAME2, "fanout");
//绑定队列到交换机
channel.queueBind(QUEUE_NAME2,QUEUE_EXANGE_NAME2,"" );
//保证只分发一次
//channel.basicQos( 1 );
// .创建一个消费者队列consumer,并指定channel
QueueingConsumer consumer = new QueueingConsumer(channel);
//7.指定接收者,第二个参数为自动应答,无需手动应答
channel.basicConsume(QUEUE_NAME2, true, consumer);
while (true) {
// 从consumer中获取队列中的消息,nextDelivery是一个阻塞方法,如果队列中无内容,则等待
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("【消费者2接收到了\" + QUEUE_NAME + \"中的消息:】---" + message);
}
} catch (Exception e) {
e.printStackTrace( );
}
}
}
二 direct(路由模式)
direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。
只有在routing key匹配的队列才可以接受对应的交换机的消息,可以指定多个routing key
生产者
package com.example.rabbit.web.test.routing;
import com.example.rabbit.utils.ConnectionFactoryUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
/**
* ***GOOD LUCK****
*
* @Author : Wukn
* @Date : 2018/6/
*/
public class Send {
/**
* 队列名
*/
public String QUEUE_NAME2 = "test_simple_queue_direct1";
/**
* 交换机名
*/
public String QUEUE_EXANGE_NAME2 = "test_simple_queue_exchangedirect";
/**
* 生产者
*/
@Test
public void test02() {
try {
//获取连接
Connection connection = ConnectionFactoryUtil.getConnection( );
//获取通道
Channel channel = connection.createChannel();
//申明交换机 和交换机类型
channel.exchangeDeclare(QUEUE_EXANGE_NAME2,"direct" );
//创建队列申明
//channel.queueDeclare(QUEUE_NAME2,false,false,false,null );
//每个消费者发送确认消息之前只发一个,不发送下一个
// channel.basicQos( 1 );
String message = "消息---"+"direct";
//发送消息到交换机 定义routingkey
String routingkey = "error";
channel.basicPublish( QUEUE_EXANGE_NAME2,routingkey,null,message.getBytes() );
System.out.println("发送消息。。。"+message);
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace( );
}
}
}
消费者1
package com.example.rabbit.web.test.routing;
import com.example.rabbit.utils.ConnectionFactoryUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import org.junit.Test;
/**
* ***GOOD LUCK****
*
* @Author : Wukn
* @Date : 2018/6/
*/
public class CustomerDirect {
/**
* 队列名
*/
public String QUEUE_NAME2 = "test_simple_queue_direct2";
/**
* 交换机名
*/
public String QUEUE_EXANGE_NAME2 = "test_simple_queue_exchangedirect";
/**
* 消费消息2
*/
@Test
public void test033() throws Exception{
//获取连接
Connection connection = ConnectionFactoryUtil.getConnection( );
//获取通道
Channel channel = connection.createChannel();
//通过channel指定队列
channel.queueDeclare(QUEUE_NAME2,false,false,false,null );
//交换机类型
channel.exchangeDeclare(QUEUE_EXANGE_NAME2, "direct");
String routingkey = "error";
//绑定队列到交换机
channel.queueBind(QUEUE_NAME2,QUEUE_EXANGE_NAME2,routingkey );
//保证只分发一次
//channel.basicQos( 1 );
// .创建一个消费者队列consumer,并指定channel
QueueingConsumer consumer = new QueueingConsumer(channel);
//7.指定接收者,第二个参数为自动应答,无需手动应答
channel.basicConsume(QUEUE_NAME2, true, consumer);
while (true) {
// 从consumer中获取队列中的消息,nextDelivery是一个阻塞方法,如果队列中无内容,则等待
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("【消费者2接收到了\" + QUEUE_NAME + \"中的消息:】---" + message);
}
}
}
消费者2
package com.example.rabbit.web.test.routing;
import com.example.rabbit.utils.ConnectionFactoryUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import org.junit.Test;
/**
* ***GOOD LUCK****
*
* @Author : Wukn
* @Date : 2018/6/
*/
public class CustomerDirect2 {
/**
* 队列名
*/
public String QUEUE_NAME2 = "test_simple_queue_direct2";
/**
* 交换机名
*/
public String QUEUE_EXANGE_NAME2 = "test_simple_queue_exchangedirect";
/**
* 消费消息2
*/
@Test
public void test033() throws Exception{
//获取连接
Connection connection = ConnectionFactoryUtil.getConnection( );
//获取通道
Channel channel = connection.createChannel();
//通过channel指定队列
channel.queueDeclare(QUEUE_NAME2,false,false,false,null );
//交换机类型
channel.exchangeDeclare(QUEUE_EXANGE_NAME2, "direct");
String routingkey = "error";
String routingkey1 = "info";
String routingkey2 = "warning";
//绑定队列到交换机
channel.queueBind(QUEUE_NAME2,QUEUE_EXANGE_NAME2,routingkey );
channel.queueBind(QUEUE_NAME2,QUEUE_EXANGE_NAME2,routingkey1 );
channel.queueBind(QUEUE_NAME2,QUEUE_EXANGE_NAME2,routingkey2 );
//保证只分发一次
//channel.basicQos( 1 );
// .创建一个消费者队列consumer,并指定channel
QueueingConsumer consumer = new QueueingConsumer(channel);
//7.指定接收者,第二个参数为自动应答,无需手动应答
channel.basicConsume(QUEUE_NAME2, true, consumer);
while (true) {
// 从consumer中获取队列中的消息,nextDelivery是一个阻塞方法,如果队列中无内容,则等待
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("【消费者2接收到了\" + QUEUE_NAME + \"中的消息:】---" + message);
}
}
}
3:topic(主题模式)
前面讲到direct类型的Exchange路由规则是完全匹配binding key与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,它约定:
routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
binding key与routing key一样也是句点号“. ”分隔的字符串。
binding key中可以存在两种特殊字符“*”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)。
生产
package com.example.rabbit.web.test.topic;
import com.example.rabbit.utils.ConnectionFactoryUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
/**
* ***GOOD LUCK****
*
* @Author : Wukn
* @Date : 2018/6/
*/
public class Send {
/**
* # 匹配一个或者多个
* * 匹配一个
*/
/**
* 队列名
*/
public String QUEUE_NAME2 = "test_simple_queue_topic1";
/**
* 交换机名
*/
public String QUEUE_EXANGE_NAME2 = "test_simple_queue_exchangetopic";
/**
* 生产者
*/
@Test
public void test02() {
try {
//获取连接
Connection connection = ConnectionFactoryUtil.getConnection( );
//获取通道
Channel channel = connection.createChannel();
//申明交换机 和交换机类型
channel.exchangeDeclare(QUEUE_EXANGE_NAME2,"topic" );
//创建队列申明
//channel.queueDeclare(QUEUE_NAME2,false,false,false,null );
//每个消费者发送确认消息之前只发一个,不发送下一个
// channel.basicQos( 1 );
String message = "消息---"+"topic";
//发送消息到交换机 定义routingkey
String routingkey = "goods";
channel.basicPublish( QUEUE_EXANGE_NAME2,routingkey,null,message.getBytes() );
System.out.println("发送消息。。。"+message);
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace( );
}
}
}
消费1
package com.example.rabbit.web.test.topic;
import com.example.rabbit.utils.ConnectionFactoryUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import org.junit.Test;
/**
* ***GOOD LUCK****
*
* @Author : Wukn
* @Date : 2018/6/
*/
public class CustomerTopic {
/**
* 队列名
*/
public String QUEUE_NAME2 = "test_simple_queue_topic1";
/**
* 交换机名
*/
public String QUEUE_EXANGE_NAME2 = "test_simple_queue_exchangetopic";
/**
* 消费消息2
*/
@Test
public void test033() throws Exception{
//获取连接
Connection connection = ConnectionFactoryUtil.getConnection( );
//获取通道
Channel channel = connection.createChannel();
//通过channel指定队列
channel.queueDeclare(QUEUE_NAME2,false,false,false,null );
//交换机类型
//channel.exchangeDeclare(QUEUE_EXANGE_NAME2, "topic");
String routingkey = "good.*";
//绑定队列到交换机
channel.queueBind(QUEUE_NAME2,QUEUE_EXANGE_NAME2,routingkey );
//保证只分发一次
//channel.basicQos( 1 );
// .创建一个消费者队列consumer,并指定channel
QueueingConsumer consumer = new QueueingConsumer(channel);
//7.指定接收者,第二个参数为自动应答,无需手动应答
channel.basicConsume(QUEUE_NAME2, true, consumer);
while (true) {
// 从consumer中获取队列中的消息,nextDelivery是一个阻塞方法,如果队列中无内容,则等待
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("【消费者2接收到了\" + QUEUE_NAME + \"中的消息:】---" + message);
}
}
}
消费2
package com.example.rabbit.web.test.topic;
import com.example.rabbit.utils.ConnectionFactoryUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import org.junit.Test;
/**
* ***GOOD LUCK****
*
* @Author : Wukn
* @Date : 2018/6/
*/
public class CustomerTopic2 {
/**
* 队列名
*/
public String QUEUE_NAME2 = "test_simple_queue_topic1";
/**
* 交换机名
*/
public String QUEUE_EXANGE_NAME2 = "test_simple_queue_exchangetopic";
/**
* 消费消息2
*/
@Test
public void test033() throws Exception{
//获取连接
Connection connection = ConnectionFactoryUtil.getConnection( );
//获取通道
Channel channel = connection.createChannel();
//通过channel指定队列
channel.queueDeclare(QUEUE_NAME2,false,false,false,null );
//交换机类型
channel.exchangeDeclare(QUEUE_EXANGE_NAME2, "topic");
/**
* .#表示匹配所有或者一个
*/
String routingkey = "goods.#";
//绑定队列到交换机
channel.queueBind(QUEUE_NAME2,QUEUE_EXANGE_NAME2,routingkey );
//保证只分发一次
//channel.basicQos( 1 );
// .创建一个消费者队列consumer,并指定channel
QueueingConsumer consumer = new QueueingConsumer(channel);
//7.指定接收者,第二个参数为自动应答,无需手动应答
channel.basicConsume(QUEUE_NAME2, true, consumer);
while (true) {
// 从consumer中获取队列中的消息,nextDelivery是一个阻塞方法,如果队列中无内容,则等待
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("【消费者2接收到了\" + QUEUE_NAME + \"中的消息:】---" + message);
}
}
}