rabbitMq有四种类型的交换机fanout、direct、topic、headers

实际在RabbitMQ中,生产者将消息发送到Exchange(交换器,下图中的X),由Exchange将消息路由到一个或多个Queue中(或者丢弃)。

Exchange是按照什么逻辑将消息路由到Queue的?。

RabbitMQ中的Exchange有四种类型,不同的类型有着不同的路由策略,。有四种类型的交换机fanout、direct、topic、headers

生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。

在Exchange Type与binding key固定的情况下(在正常使用时一般这些内容都是固定配置好的),我们的生产者就可以在发送消息给Exchange时,通过指定routing key来决定消息流向哪里。RabbitMQ为routing key设定的长度限制为255 bytes。

rabbitMQ 交换机

 

一   fanout类型的交换机(发布订阅模式)

fanout类型交换机会将接收到的消息广播给所有与之绑定的队列。

rabbitMQ 交换机

package com.example.rabbit.web.test.sendandexchange;

import com.example.rabbit.utils.ConnectionFactoryUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;

/**
 * ***GOOD LUCK****
 *
 * @Author : Wukn
 * @Date : 2018/9/30
 *
 * 生产者
 */
public class SendExchangeService {

    /**
     * 队列名
     */
    public  String QUEUE_NAME2 = "test_simple_queue";


    /**
     * 交换机名
     */
    public  String QUEUE_EXANGE_NAME2 = "test_simple_queue_exchange";


    /**
     * 生产者
     */
    @Test
    public  void test01() {

        try {
            //获取连接
            Connection connection = ConnectionFactoryUtil.getConnection( );

            //获取通道
            Channel channel =  connection.createChannel();

            //申明交换机
            channel.exchangeDeclare(QUEUE_EXANGE_NAME2,"fanout" );

            //创建队列申明
            //channel.queueDeclare(QUEUE_NAME2,false,false,false,null  );

            //每个消费者发送确认消息之前只发一个,不发送下一个
            // channel.basicQos( 1 );

            for (int i = 0;i<30;++i) {
                String message = "消息---"+i;
                //发送消息到交换机
                channel.basicPublish( "",QUEUE_EXANGE_NAME2,null,message.getBytes() );
                System.out.println("发送消息。。。"+message);
            }

            channel.close();
            connection.close();

        } catch (Exception e) {
            e.printStackTrace( );
        }

    }
}

rabbitMQ 交换机

 

 

消费者:

package com.example.rabbit.web.test.exchange.customerexchange;

import com.example.rabbit.utils.ConnectionFactoryUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.QueueingConsumer;
import org.junit.Test;

/**
 * ***GOOD LUCK****
 *
 * @Author : Wukn
 * @Date : 2018/9/30
 */
public class CustomerExchangeOne {


    /**
     * 队列么名
     */
    public  String QUEUE_NAME2 = "Emails";

    /**
     * 交换机名
     */
    public  String QUEUE_EXANGE_NAME2 = "test_simple_queue_exchange";
    

    /**
     * 消费消息2
     */
    @Test
    public void test033() {

        try {
            //获取连接
            Connection connection = ConnectionFactoryUtil.getConnection( );

            //获取通道
            Channel channel =  connection.createChannel();

            //通过channel指定队列
            channel.queueDeclare(QUEUE_NAME2,false,false,false,null  );

            //交换机类型
            channel.exchangeDeclare(QUEUE_EXANGE_NAME2, "fanout");

            //绑定队列到交换机
            channel.queueBind(QUEUE_NAME2,QUEUE_EXANGE_NAME2,""  );

            //保证只分发一次
            //channel.basicQos( 1 );


            // .创建一个消费者队列consumer,并指定channel
            QueueingConsumer consumer = new QueueingConsumer(channel);

            //7.指定接收者,第二个参数为自动应答,无需手动应答
            channel.basicConsume(QUEUE_NAME2, true, consumer);

            while (true) {
                // 从consumer中获取队列中的消息,nextDelivery是一个阻塞方法,如果队列中无内容,则等待
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println("【消费者2接收到了\" + QUEUE_NAME + \"中的消息:】---" + message);
            }

        } catch (Exception e) {
            e.printStackTrace( );
        }
        
    }
    
}

 

rabbitMQ 交换机

rabbitMQ 交换机

 

 

二  direct(路由模式)

direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。

 

rabbitMQ 交换机

 

只有在routing key匹配的队列才可以接受对应的交换机的消息,可以指定多个routing key

生产者

package com.example.rabbit.web.test.routing;

import com.example.rabbit.utils.ConnectionFactoryUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;

/**
 * ***GOOD LUCK****
 *
 * @Author : Wukn
 * @Date : 2018/6/
 */
public class Send {

    /**
     * 队列名
     */
    public  String QUEUE_NAME2 = "test_simple_queue_direct1";


    /**
     * 交换机名
     */
    public  String QUEUE_EXANGE_NAME2 = "test_simple_queue_exchangedirect";


    /**
     * 生产者
     */
    @Test
    public  void test02() {

        try {
            //获取连接
            Connection connection = ConnectionFactoryUtil.getConnection( );

            //获取通道
            Channel channel =  connection.createChannel();

            //申明交换机  和交换机类型
            channel.exchangeDeclare(QUEUE_EXANGE_NAME2,"direct" );

            //创建队列申明
            //channel.queueDeclare(QUEUE_NAME2,false,false,false,null  );

            //每个消费者发送确认消息之前只发一个,不发送下一个
            // channel.basicQos( 1 );


            String message = "消息---"+"direct";

            //发送消息到交换机   定义routingkey
            String routingkey = "error";
            channel.basicPublish( QUEUE_EXANGE_NAME2,routingkey,null,message.getBytes() );
            System.out.println("发送消息。。。"+message);

            channel.close();
            connection.close();

        } catch (Exception e) {
            e.printStackTrace( );
        }

    }
}

 

 

消费者1

package com.example.rabbit.web.test.routing;

import com.example.rabbit.utils.ConnectionFactoryUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import org.junit.Test;

/**
 * ***GOOD LUCK****
 *
 * @Author : Wukn
 * @Date : 2018/6/
 */
public class CustomerDirect {


    /**
     * 队列名
     */
    public  String QUEUE_NAME2 = "test_simple_queue_direct2";


    /**
     * 交换机名
     */
    public  String QUEUE_EXANGE_NAME2 = "test_simple_queue_exchangedirect";



    /**
     * 消费消息2
     */
    @Test
    public void test033() throws Exception{
            //获取连接
            Connection connection = ConnectionFactoryUtil.getConnection( );

            //获取通道
            Channel channel =  connection.createChannel();

            //通过channel指定队列
            channel.queueDeclare(QUEUE_NAME2,false,false,false,null  );

            //交换机类型
            channel.exchangeDeclare(QUEUE_EXANGE_NAME2, "direct");

            String routingkey = "error";
            //绑定队列到交换机
            channel.queueBind(QUEUE_NAME2,QUEUE_EXANGE_NAME2,routingkey  );

            //保证只分发一次
            //channel.basicQos( 1 );


            // .创建一个消费者队列consumer,并指定channel
            QueueingConsumer consumer = new QueueingConsumer(channel);

            //7.指定接收者,第二个参数为自动应答,无需手动应答
            channel.basicConsume(QUEUE_NAME2, true, consumer);

            while (true) {
                // 从consumer中获取队列中的消息,nextDelivery是一个阻塞方法,如果队列中无内容,则等待
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println("【消费者2接收到了\" + QUEUE_NAME + \"中的消息:】---" + message);
            }

    }
}

消费者2

package com.example.rabbit.web.test.routing;

import com.example.rabbit.utils.ConnectionFactoryUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import org.junit.Test;

/**
 * ***GOOD LUCK****
 *
 * @Author : Wukn
 * @Date : 2018/6/
 */
public class CustomerDirect2 {


    /**
     * 队列名
     */
    public  String QUEUE_NAME2 = "test_simple_queue_direct2";


    /**
     * 交换机名
     */
    public  String QUEUE_EXANGE_NAME2 = "test_simple_queue_exchangedirect";



    /**
     * 消费消息2
     */
    @Test
    public void test033() throws Exception{
            //获取连接
            Connection connection = ConnectionFactoryUtil.getConnection( );

            //获取通道
            Channel channel =  connection.createChannel();

            //通过channel指定队列
            channel.queueDeclare(QUEUE_NAME2,false,false,false,null  );

            //交换机类型
            channel.exchangeDeclare(QUEUE_EXANGE_NAME2, "direct");

            String routingkey = "error";
            String routingkey1 = "info";
            String routingkey2 = "warning";
            //绑定队列到交换机
            channel.queueBind(QUEUE_NAME2,QUEUE_EXANGE_NAME2,routingkey  );
            channel.queueBind(QUEUE_NAME2,QUEUE_EXANGE_NAME2,routingkey1  );
            channel.queueBind(QUEUE_NAME2,QUEUE_EXANGE_NAME2,routingkey2  );

            //保证只分发一次
            //channel.basicQos( 1 );


            // .创建一个消费者队列consumer,并指定channel
            QueueingConsumer consumer = new QueueingConsumer(channel);

            //7.指定接收者,第二个参数为自动应答,无需手动应答
            channel.basicConsume(QUEUE_NAME2, true, consumer);

            while (true) {
                // 从consumer中获取队列中的消息,nextDelivery是一个阻塞方法,如果队列中无内容,则等待
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println("【消费者2接收到了\" + QUEUE_NAME + \"中的消息:】---" + message);
            }

    }
}

 

rabbitMQ 交换机

 

rabbitMQ 交换机

 

 

3:topic(主题模式)

前面讲到direct类型的Exchange路由规则是完全匹配binding key与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,它约定:

routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”

binding key与routing key一样也是句点号“. ”分隔的字符串。

binding key中可以存在两种特殊字符“*”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)。

rabbitMQ 交换机

rabbitMQ 交换机

 

生产

package com.example.rabbit.web.test.topic;

import com.example.rabbit.utils.ConnectionFactoryUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;

/**
 * ***GOOD LUCK****
 *
 * @Author : Wukn
 * @Date : 2018/6/
 */
public class Send {

    /**
     * # 匹配一个或者多个
     * *  匹配一个
     */



    /**
     * 队列名
     */
    public  String QUEUE_NAME2 = "test_simple_queue_topic1";


    /**
     * 交换机名
     */
    public  String QUEUE_EXANGE_NAME2 = "test_simple_queue_exchangetopic";


    /**
     * 生产者
     */
    @Test
    public  void test02() {

        try {
            //获取连接
            Connection connection = ConnectionFactoryUtil.getConnection( );

            //获取通道
            Channel channel =  connection.createChannel();

            //申明交换机  和交换机类型
            channel.exchangeDeclare(QUEUE_EXANGE_NAME2,"topic" );

            //创建队列申明
            //channel.queueDeclare(QUEUE_NAME2,false,false,false,null  );

            //每个消费者发送确认消息之前只发一个,不发送下一个
            // channel.basicQos( 1 );


            String message = "消息---"+"topic";

            //发送消息到交换机   定义routingkey
            String routingkey = "goods";
            channel.basicPublish( QUEUE_EXANGE_NAME2,routingkey,null,message.getBytes() );
            System.out.println("发送消息。。。"+message);

            channel.close();
            connection.close();

        } catch (Exception e) {
            e.printStackTrace( );
        }

    }
}

 

消费1

package com.example.rabbit.web.test.topic;

import com.example.rabbit.utils.ConnectionFactoryUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import org.junit.Test;

/**
 * ***GOOD LUCK****
 *
 * @Author : Wukn
 * @Date : 2018/6/
 */
public class CustomerTopic {


    /**
     * 队列名
     */
    public  String QUEUE_NAME2 = "test_simple_queue_topic1";


    /**
     * 交换机名
     */
    public  String QUEUE_EXANGE_NAME2 = "test_simple_queue_exchangetopic";



    /**
     * 消费消息2
     */
    @Test
    public void test033() throws Exception{
            //获取连接
            Connection connection = ConnectionFactoryUtil.getConnection( );

            //获取通道
            Channel channel =  connection.createChannel();

            //通过channel指定队列
            channel.queueDeclare(QUEUE_NAME2,false,false,false,null  );

            //交换机类型
            //channel.exchangeDeclare(QUEUE_EXANGE_NAME2, "topic");

            String routingkey = "good.*";
            //绑定队列到交换机
            channel.queueBind(QUEUE_NAME2,QUEUE_EXANGE_NAME2,routingkey  );

            //保证只分发一次
            //channel.basicQos( 1 );


            // .创建一个消费者队列consumer,并指定channel
            QueueingConsumer consumer = new QueueingConsumer(channel);

            //7.指定接收者,第二个参数为自动应答,无需手动应答
            channel.basicConsume(QUEUE_NAME2, true, consumer);

            while (true) {
                // 从consumer中获取队列中的消息,nextDelivery是一个阻塞方法,如果队列中无内容,则等待
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println("【消费者2接收到了\" + QUEUE_NAME + \"中的消息:】---" + message);
            }

    }
}

 

 

消费2

package com.example.rabbit.web.test.topic;

import com.example.rabbit.utils.ConnectionFactoryUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import org.junit.Test;

/**
 * ***GOOD LUCK****
 *
 * @Author : Wukn
 * @Date : 2018/6/
 */
public class CustomerTopic2 {

    /**
     * 队列名
     */
    public  String QUEUE_NAME2 = "test_simple_queue_topic1";


    /**
     * 交换机名
     */
    public  String QUEUE_EXANGE_NAME2 = "test_simple_queue_exchangetopic";



    /**
     * 消费消息2
     */
    @Test
    public void test033() throws Exception{
            //获取连接
            Connection connection = ConnectionFactoryUtil.getConnection( );

            //获取通道
            Channel channel =  connection.createChannel();

            //通过channel指定队列
            channel.queueDeclare(QUEUE_NAME2,false,false,false,null  );

            //交换机类型
            channel.exchangeDeclare(QUEUE_EXANGE_NAME2, "topic");

        /**
         * .#表示匹配所有或者一个
         */
        String routingkey = "goods.#";
            //绑定队列到交换机
            channel.queueBind(QUEUE_NAME2,QUEUE_EXANGE_NAME2,routingkey  );

            //保证只分发一次
            //channel.basicQos( 1 );


            // .创建一个消费者队列consumer,并指定channel
            QueueingConsumer consumer = new QueueingConsumer(channel);

            //7.指定接收者,第二个参数为自动应答,无需手动应答
            channel.basicConsume(QUEUE_NAME2, true, consumer);

            while (true) {
                // 从consumer中获取队列中的消息,nextDelivery是一个阻塞方法,如果队列中无内容,则等待
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println("【消费者2接收到了\" + QUEUE_NAME + \"中的消息:】---" + message);
            }

    }
}

 

rabbitMQ 交换机

相关文章:

  • 2021-11-14
  • 2021-11-14
  • 2021-11-24
  • 2021-10-19
  • 2021-11-14
  • 2021-12-19
  • 2019-08-30
猜你喜欢
  • 2021-10-19
  • 2019-11-10
  • 2021-11-14
  • 2021-04-10
  • 2021-11-08
  • 2021-07-31
  • 2021-10-19
相关资源
相似解决方案