消息队列之RabbitMQ

Posted on 收藏

1.rabbitMQ介绍

rabbitMQ是由erlang语言开发的,基于AMQP协议实现的消息队列。他是一种应用程序之间的通信方法,在分布式系统开发中应用非常广泛。

rabbitMq的有点:

  1. 使用简单,功能强大
  2. 基于AMQP协议
  3. 社区活跃,文档完善
  4. 高并发性能好,erlang语言是专门用于开发高并发程序的
  5. springBoot默认集成rabbitMq

AMQP(advanced Message Queuing Protocol),是一个提供统一消息服务的应用标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受客户端中间件的产品不同和开发语言不同的限制。JMS和AMQP的区别在于:JMS是java语言专属的消息服务标准,他是在api层定义标准,并且只能用于java应用,而AMQP是在协议层定义的标准,是可以跨语言的。

2.工作流程

发送消息:

  1. 生产者和broker建立TCP连接
  2. 生产者和broker建立通道
  3. 生产者通过通道消息发送给broker,由exchange将消息转发
  4. exchange将消息转发给指定的queue

接受消息:

  1. 消费者和broker建立TCP连接
  2. 消费者和broker建立通道
  3. 消费者监听指定的queue
  4. 当有消息到达queue的时候broker默认将消息推送给消费者
  5. 消费者接受到消息并消费

3.安装

消息队列之RabbitMQ
    




消息队列之RabbitMQ

如果不想自己下载,需要我这里的软件的,可以在下面评论邮箱,我私发给你。

1.安装erlang的环境,双击otp的运行程序,然后一路点击下一步(next)。

消息队列之RabbitMQ
    




消息队列之RabbitMQ

配置环境变量

消息队列之RabbitMQ
    




消息队列之RabbitMQ

在path中添加erlang的路径

消息队列之RabbitMQ
    




消息队列之RabbitMQ

2.安装rabbitMq,双击rabbitmq的运行程序

消息队列之RabbitMQ
    




消息队列之RabbitMQ

消息队列之RabbitMQ
    




消息队列之RabbitMQ

安装完成之后在菜单页面可以看到

消息队列之RabbitMQ
    




消息队列之RabbitMQ

安装完RabbitMQ如果想要访问管理页面需要在rabbitmq的sbin目录中使用cmd执行:rabbitmq-plugins.bat enable rabbitmq_management(管理员身份运行此命令)添加可视化插件。

点击上图中的start/stop来开启/停止服务。然后在浏览器上输入地址查看,rabbitMq的默认端口是15672。默认的用户名和密码都是guest

消息队列之RabbitMQ
    




消息队列之RabbitMQ

如果安装失败,需要卸载重装的时候或者出现rabbitMq服务注册失败时,此时需要进入注册表清理erlang(搜索rabbitMQ,erlsrv将对应的项删除)

4.代码实现

1.添加依赖

<!--添加rabbitMq的依赖-->
<dependency>
	<groupId>com.rabbitmq</groupId>
	<artifactId>amqp-client</artifactId>
	<version>5.7.3</version>
</dependency>

2.生产者代码实现

package rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.net.ConnectException;
import java.util.concurrent.TimeoutException;

/**
 * @className: producer
 * @description: rabbitmq的生产者代码实现
 * @author: charon
 * @create: 2021-01-03 23:10
 */
public class Producer {
    /**
     * 声明队列名
     */
    private static final String QUEUE = "hello charon";

    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置ip,端口,因为是本机,所以直接设置为127.0.0.1
        connectionFactory.setHost("127.0.0.1");
        // web端口默认为15672,通信端口为5672
        connectionFactory.setPort(5672);
        // 设置用户名和密码
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        // 设置虚拟ip,默认为/,一个rabbitmq的服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection();
            // 创建通道
            channel = connection.createChannel();
            // 声明队列(队列名称,是否持久化,是否排它,是否自动删除,队列的扩展参数比如设置存活时间等)
            channel.queueDeclare(QUEUE, true, false, false, null);
            String message = "hello charon good evening";
            // 发布消息(交换机,RoutingKey即队列名,额外的消息属性,消息内容)
            channel.basicPublish("", QUEUE, null, message.getBytes());
            System.out.println("发送消息给mq:" + message);
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            // 关闭资源
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

3.消费者代码实现

package rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @className: Consumer
 * @description: 消费者的代码实现
 * @author: charon
 * @create: 2021-01-05 08:28
 */
public class Consumer {
    /**
     * 声明队列名
     */
    private static final String QUEUE = "hello charon";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置ip,端口,因为是本机,所以直接设置为127.0.0.1
        connectionFactory.setHost("127.0.0.1");
        // web端口默认为15672,通信端口为5672
        connectionFactory.setPort(5672);
        // 设置用户名和密码
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        // 设置虚拟ip,默认为/,一个rabbitmq的服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
        connectionFactory.setVirtualHost("/");
        Connection connection = connectionFactory.newConnection();
        // 创建通道
        Channel channel = connection.createChannel();
        // 声明队列(队列名称,是否持久化,是否排它,是否自动删除,队列的扩展参数比如设置存活时间等)
        channel.queueDeclare(QUEUE, true, false, false, null);
        // 实现消费方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             *
             * @param consumerTag 消费者标签
             * @param envelope 信封,可以获取交换机等信息
             * @param properties 消息属性
             * @param body 消费内容,字节数组,可以转成字符串
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                String exchange = envelope.getExchange();
//                long deliveryTag = envelope.getDeliveryTag();
                String message = new String(body,"utf-8");
                System.out.println("收到的消息是:"+message);
            }
        };
        // 消费消息(队列名,是否自动确认,消费方法)
        channel.basicConsume(QUEUE,true,defaultConsumer);
    }
}

5.rabbitMq的工作模式

  1. Work queues 工作队列(资源竞争)

消息队列之RabbitMQ
    




消息队列之RabbitMQ

​ 生产者将消息放入到队列中,消费者可以有多个,同时监听同一个队列。如上图,消费者c1,c2共同争抢当前消息队列的内容,谁先拿到谁负责消费消息,缺点是在高并发的情况下,默认会产品一个消息被多个消费者共同使用,可以设置一个锁开关,保证一条消息只能被一个消费者使用。

上面的代码,可以再添加一个消费者,这样就可以实现工作队列的工作模式。

2.Publish/Subscribe 发布订阅(共享资源)

消息队列之RabbitMQ
    




消息队列之RabbitMQ

X代表rabbitMq内部组件交换机,生产者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应的消费者拿到消息进行消费,对比工作队列而言,发布订阅可以实现工作队列的功能,但是比工作队列更强大。

特点:
1.每个消费者监听自己的队列
2.生产者将消息发送给Broker,由交换机将消息转发到绑定的此交换机的每个队列,每个绑定交换机的队列都将接收到消息;

生产者:

package rabbitmq.publish;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @className: Producer
 * @description: 发布订阅的生产者
 * @author: charon
 * @create: 2021-01-07 22:02
 */
public class Producer {

    /**邮件的队列*/
    public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";

    /**短信的队列*/
    public static final String QUEUE_INFORM_SMS = "queue_inform_sms";

    /**交换机*/
    public static final String EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform";

    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置ip,端口,因为是本机,所以直接设置为127.0.0.1
        connectionFactory.setHost("127.0.0.1");
        // web端口默认为15672,通信端口为5672
        connectionFactory.setPort(5672);
        // 设置用户名和密码
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        // 设置虚拟ip,默认为/,一个rabbitmq的服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection();
            // 创建通道
            channel = connection.createChannel();
            // 声明队列(队列名称,是否持久化,是否排它,是否自动删除,队列的扩展参数比如设置存活时间等)
            channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
            channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
            // 交换机(交换机名称,交换机类型(fanout:发布订阅,direct:routing,topic:主题,headers:header模式))
            channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
            // 绑定交换机(队列名称,交换机名称,routingKey(发布订阅设置为空))
            channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,"");
            channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,"");
            // 发送多条消息
            for (int i = 0; i < 5; i++) {
                String message = "hello charon good evening by publish";
                // 指定交换机(交换机,RoutingKey即队列名,额外的消息属性,消息内容)
                channel.basicPublish(EXCHANGE_FANOUT_INFORM, "", null, message.getBytes());
                System.out.println("发送消息给mq:" + message);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            // 关闭资源
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

消费email的消费者:

package rabbitmq.publish;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @className: EmailConsumer
 * @description: 邮件的消息消费者
 * @author: charon
 * @create: 2021-01-07 22:14
 */
public class EmailConsumer {

    /**邮件的队列*/
    public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";

    /**短信的队列*/
    public static final String QUEUE_INFORM_SMS = "queue_inform_sms";

    /**交换机*/
    public static final String EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置ip,端口,因为是本机,所以直接设置为127.0.0.1
        connectionFactory.setHost("127.0.0.1");
        // web端口默认为15672,通信端口为5672
        connectionFactory.setPort(5672);
        // 设置用户名和密码
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        // 设置虚拟ip,默认为/,一个rabbitmq的服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
        connectionFactory.setVirtualHost("/");
        Connection connection = connectionFactory.newConnection();
        // 创建通道
        Channel channel = connection.createChannel();
        // 声明队列(队列名称,是否持久化,是否排它,是否自动删除,队列的扩展参数比如设置存活时间等)
        channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
        channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
        channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
        // 实现消费方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             *
             * @param consumerTag 消费者标签
             * @param envelope 信封,可以获取交换机等信息
             * @param properties 消息属性
             * @param body 消费内容,字节数组,可以转成字符串
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                String exchange = envelope.getExchange();
//                long deliveryTag = envelope.getDeliveryTag();
                String message = new String(body,"utf-8");
                System.out.println("收到的email消息是:"+message);
            }
        };
        // 消费消息(队列名,是否自动确认,消费方法)
        channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);
    }
}

消费短信的消费者:

package rabbitmq.publish;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @className: SmsConsumer
 * @description:
 * @author: charon
 * @create: 2021-01-07 22:17
 */
public class SmsConsumer {


    /**邮件的队列*/
    public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";

    /**短信的队列*/
    public static final String QUEUE_INFORM_SMS = "queue_inform_sms";

    /**交换机*/
    public static final String EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置ip,端口,因为是本机,所以直接设置为127.0.0.1
        connectionFactory.setHost("127.0.0.1");
        // web端口默认为15672,通信端口为5672
        connectionFactory.setPort(5672);
        // 设置用户名和密码
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        // 设置虚拟ip,默认为/,一个rabbitmq的服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
        connectionFactory.setVirtualHost("/");
        Connection connection = connectionFactory.newConnection();
        // 创建通道
        Channel channel = connection.createChannel();
        // 声明队列(队列名称,是否持久化,是否排它,是否自动删除,队列的扩展参数比如设置存活时间等)
        channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
        channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
        channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
        // 实现消费方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             *
             * @param consumerTag 消费者标签
             * @param envelope 信封,可以获取交换机等信息
             * @param properties 消息属性
             * @param body 消费内容,字节数组,可以转成字符串
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body,"utf-8");
                System.out.println("收到的短信消息是:"+message);
            }
        };
        // 消费消息(队列名,是否自动确认,消费方法)
        channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer);
    }
}

3.Routing 路由模式

消息队列之RabbitMQ
    




消息队列之RabbitMQ

生产者将消息发送给交换机按照路由判断,交换机根据路由的key,只能匹配上路由key的对应的消息队列,对应的消费者才能消费消息。

如上图,rabbitMq根据对应的key,将消息发送到对应的队列中,error通知将发送到amqp.gen-S9b上,由消费者c1消费。error,info,warning通知将发送到amqp.gen-Ag1上,由消费者c2消费。

特点:
1.每个消费者监听自己的队列,并且设置路由key
2.生产者将消息发送给交换机,由交换机根据路由key来转发消息到指定的队列

生产者:

package rabbitmq.routing;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @className: Producer
 * @description: 路由模式下的生成者
 * @author: charon
 * @create: 2021-01-07 22:34
 */
public class Producer {

    /**邮件的队列*/
    public static final String QUEUE_ROUTING_EMAIL = "queue_routing_email";

    /**短信的队列*/
    public static final String QUEUE_ROUTING_SMS = "queue_routing_sms";

    /**交换机*/
    public static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";

    /** 设置email的路由key */
    public static final String ROUTING_EMAIL = "routing_email";

    /** 设置sms的路由key */
    public static final String ROUTING_SMS = "routing_sms";

    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置ip,端口,因为是本机,所以直接设置为127.0.0.1
        connectionFactory.setHost("127.0.0.1");
        // web端口默认为15672,通信端口为5672
        connectionFactory.setPort(5672);
        // 设置用户名和密码
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        // 设置虚拟ip,默认为/,一个rabbitmq的服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection();
            // 创建通道
            channel = connection.createChannel();
            // 声明队列(队列名称,是否持久化,是否排它,是否自动删除,队列的扩展参数比如设置存活时间等)
            channel.queueDeclare(QUEUE_ROUTING_EMAIL, true, false, false, null);
            channel.queueDeclare(QUEUE_ROUTING_SMS, true, false, false, null);
            // 交换机(交换机名称,交换机类型(fanout:发布订阅,direct:routing,topic:主题,headers:header模式))
            channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
            // 绑定交换机(队列名称,交换机名称,routingKey)
            channel.queueBind(QUEUE_ROUTING_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTING_EMAIL);
            channel.queueBind(QUEUE_ROUTING_SMS,EXCHANGE_ROUTING_INFORM,ROUTING_SMS);
            // 发送多条消息
            for (int i = 0; i < 5; i++) {
                String message = "hello charon good evening by routing --email";
                // 指定交换机(交换机,RoutingKey,额外的消息属性,消息内容)
                channel.basicPublish(EXCHANGE_ROUTING_INFORM, ROUTING_EMAIL, null, message.getBytes());
                System.out.println("发送消息给mq:" + message);
            }
            // 发送多条消息
            for (int i = 0; i < 5; i++) {
                String message = "hello charon good evening by routing --sms";
                // 指定交换机(交换机,RoutingKey,额外的消息属性,消息内容)
                channel.basicPublish(EXCHANGE_ROUTING_INFORM, ROUTING_SMS, null, message.getBytes());
                System.out.println("发送消息给mq:" + message);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            // 关闭资源
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

消费email的消费者:

package rabbitmq.routing;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @className: EmailConsumer
 * @description: 路由模式下的email消费者
 * @author: charon
 * @create: 2021-01-07 22:40
 */
public class EmailConsumer {
    /**邮件的队列*/
    public static final String QUEUE_ROUTING_EMAIL = "queue_routing_email";

    /**交换机*/
    public static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";

    /** 设置email的路由key */
    public static final String ROUTING_EMAIL = "routing_email";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置ip,端口,因为是本机,所以直接设置为127.0.0.1
        connectionFactory.setHost("127.0.0.1");
        // web端口默认为15672,通信端口为5672
        connectionFactory.setPort(5672);
        // 设置用户名和密码
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        // 设置虚拟ip,默认为/,一个rabbitmq的服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
        connectionFactory.setVirtualHost("/");
        Connection connection = connectionFactory.newConnection();
        // 创建通道
        Channel channel = connection.createChannel();
        // 声明队列(队列名称,是否持久化,是否排它,是否自动删除,队列的扩展参数比如设置存活时间等)
        channel.queueDeclare(QUEUE_ROUTING_EMAIL, true, false, false, null);
        channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
        // 绑定队列并指明路由key
        channel.queueBind(QUEUE_ROUTING_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTING_EMAIL);
        // 实现消费方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             *
             * @param consumerTag 消费者标签
             * @param envelope 信封,可以获取交换机等信息
             * @param properties 消息属性
             * @param body 消费内容,字节数组,可以转成字符串
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body,"utf-8");
                System.out.println("收到的email消息是:"+message);
            }
        };
        // 消费消息(队列名,是否自动确认,消费方法)
        channel.basicConsume(QUEUE_ROUTING_EMAIL,true,defaultConsumer);
    }
}

消费短信的消费者:

package rabbitmq.routing;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @className: EmailConsumer
 * @description: 路由模式下的email消费者
 * @author: charon
 * @create: 2021-01-07 22:40
 */
public class SmsConsumer {
    /**邮件的队列*/
    public static final String QUEUE_ROUTING_SMS = "queue_routing_sms";

    /**交换机*/
    public static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";

    /** 设置email的路由key */
    public static final String ROUTING_SMS = "routing_sms";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置ip,端口,因为是本机,所以直接设置为127.0.0.1
        connectionFactory.setHost("127.0.0.1");
        // web端口默认为15672,通信端口为5672
        connectionFactory.setPort(5672);
        // 设置用户名和密码
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        // 设置虚拟ip,默认为/,一个rabbitmq的服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
        connectionFactory.setVirtualHost("/");
        Connection connection = connectionFactory.newConnection();
        // 创建通道
        Channel channel = connection.createChannel();
        // 声明队列(队列名称,是否持久化,是否排它,是否自动删除,队列的扩展参数比如设置存活时间等)
        channel.queueDeclare(QUEUE_ROUTING_SMS, true, false, false, null);
        channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
        // 绑定队列并指明路由key
        channel.queueBind(QUEUE_ROUTING_SMS,EXCHANGE_ROUTING_INFORM,ROUTING_SMS);
        // 实现消费方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             *
             * @param consumerTag 消费者标签
             * @param envelope 信封,可以获取交换机等信息
             * @param properties 消息属性
             * @param body 消费内容,字节数组,可以转成字符串
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body,"utf-8");
                System.out.println("收到的短信消息是:"+message);
            }
        };
        // 消费消息(队列名,是否自动确认,消费方法)
        channel.basicConsume(QUEUE_ROUTING_SMS,true,defaultConsumer);
    }
}

4.Topic 主题模式

消息队列之RabbitMQ
    




消息队列之RabbitMQ

  1. 星号井号代表通配符
  2. 星号代表一个单词,井号代表一个或多个单词
  3. 路由功能添加模糊匹配
  4. 消息产生者产生消息,把消息交给交换机
  5. 交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费

特点:
1.每个消费者监听自己的队列,并且设置带通配符的routingkey
2.生产者将消息发送给broker,由交换机及根据路由key来转发消息到指定的队列

5.Header 转发器

取消了路由key,使用header中的key/value(键值对)来匹配队列。

6.RPC 远程调用

消息队列之RabbitMQ
    




消息队列之RabbitMQ

基于direct类型交换机实现。生产者将消息远程发送给rpc队列,消费者监听rpc消息队列的消息并消息,然后将返回结果放入到响应队列中,生产者监听响应队列中的消息,拿到消费者的处理结果,实现远程RPC远程调用。

参考文件:

https://www.cnblogs.com/Jeely/p/10784013.html
https://lovnx.blog.csdn.net/article/details/70991021

1.rabbitMQ介绍

rabbitMQ是由erlang语言开发的,基于AMQP协议实现的消息队列。他是一种应用程序之间的通信方法,在分布式系统开发中应用非常广泛。

rabbitMq的有点:

  1. 使用简单,功能强大
  2. 基于AMQP协议
  3. 社区活跃,文档完善
  4. 高并发性能好,erlang语言是专门用于开发高并发程序的
  5. springBoot默认集成rabbitMq

AMQP(advanced Message Queuing Protocol),是一个提供统一消息服务的应用标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受客户端中间件的产品不同和开发语言不同的限制。JMS和AMQP的区别在于:JMS是java语言专属的消息服务标准,他是在api层定义标准,并且只能用于java应用,而AMQP是在协议层定义的标准,是可以跨语言的。

2.工作流程

发送消息:

  1. 生产者和broker建立TCP连接
  2. 生产者和broker建立通道
  3. 生产者通过通道消息发送给broker,由exchange将消息转发
  4. exchange将消息转发给指定的queue

接受消息:

  1. 消费者和broker建立TCP连接
  2. 消费者和broker建立通道
  3. 消费者监听指定的queue
  4. 当有消息到达queue的时候broker默认将消息推送给消费者
  5. 消费者接受到消息并消费

3.安装

消息队列之RabbitMQ
    




消息队列之RabbitMQ

如果不想自己下载,需要我这里的软件的,可以在下面评论邮箱,我私发给你。

1.安装erlang的环境,双击otp的运行程序,然后一路点击下一步(next)。

消息队列之RabbitMQ
    




消息队列之RabbitMQ

配置环境变量

消息队列之RabbitMQ
    




消息队列之RabbitMQ

在path中添加erlang的路径

消息队列之RabbitMQ
    




消息队列之RabbitMQ

2.安装rabbitMq,双击rabbitmq的运行程序

消息队列之RabbitMQ
    




消息队列之RabbitMQ

消息队列之RabbitMQ
    




消息队列之RabbitMQ

安装完成之后在菜单页面可以看到

消息队列之RabbitMQ
    




消息队列之RabbitMQ

安装完RabbitMQ如果想要访问管理页面需要在rabbitmq的sbin目录中使用cmd执行:rabbitmq-plugins.bat enable rabbitmq_management(管理员身份运行此命令)添加可视化插件。

点击上图中的start/stop来开启/停止服务。然后在浏览器上输入地址查看,rabbitMq的默认端口是15672。默认的用户名和密码都是guest

消息队列之RabbitMQ
    




消息队列之RabbitMQ

如果安装失败,需要卸载重装的时候或者出现rabbitMq服务注册失败时,此时需要进入注册表清理erlang(搜索rabbitMQ,erlsrv将对应的项删除)

4.代码实现

1.添加依赖

<!--添加rabbitMq的依赖-->
<dependency>
	<groupId>com.rabbitmq</groupId>
	<artifactId>amqp-client</artifactId>
	<version>5.7.3</version>
</dependency>

2.生产者代码实现

package rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.net.ConnectException;
import java.util.concurrent.TimeoutException;

/**
 * @className: producer
 * @description: rabbitmq的生产者代码实现
 * @author: charon
 * @create: 2021-01-03 23:10
 */
public class Producer {
    /**
     * 声明队列名
     */
    private static final String QUEUE = "hello charon";

    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置ip,端口,因为是本机,所以直接设置为127.0.0.1
        connectionFactory.setHost("127.0.0.1");
        // web端口默认为15672,通信端口为5672
        connectionFactory.setPort(5672);
        // 设置用户名和密码
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        // 设置虚拟ip,默认为/,一个rabbitmq的服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection();
            // 创建通道
            channel = connection.createChannel();
            // 声明队列(队列名称,是否持久化,是否排它,是否自动删除,队列的扩展参数比如设置存活时间等)
            channel.queueDeclare(QUEUE, true, false, false, null);
            String message = "hello charon good evening";
            // 发布消息(交换机,RoutingKey即队列名,额外的消息属性,消息内容)
            channel.basicPublish("", QUEUE, null, message.getBytes());
            System.out.println("发送消息给mq:" + message);
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            // 关闭资源
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

3.消费者代码实现

package rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @className: Consumer
 * @description: 消费者的代码实现
 * @author: charon
 * @create: 2021-01-05 08:28
 */
public class Consumer {
    /**
     * 声明队列名
     */
    private static final String QUEUE = "hello charon";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置ip,端口,因为是本机,所以直接设置为127.0.0.1
        connectionFactory.setHost("127.0.0.1");
        // web端口默认为15672,通信端口为5672
        connectionFactory.setPort(5672);
        // 设置用户名和密码
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        // 设置虚拟ip,默认为/,一个rabbitmq的服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
        connectionFactory.setVirtualHost("/");
        Connection connection = connectionFactory.newConnection();
        // 创建通道
        Channel channel = connection.createChannel();
        // 声明队列(队列名称,是否持久化,是否排它,是否自动删除,队列的扩展参数比如设置存活时间等)
        channel.queueDeclare(QUEUE, true, false, false, null);
        // 实现消费方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             *
             * @param consumerTag 消费者标签
             * @param envelope 信封,可以获取交换机等信息
             * @param properties 消息属性
             * @param body 消费内容,字节数组,可以转成字符串
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                String exchange = envelope.getExchange();
//                long deliveryTag = envelope.getDeliveryTag();
                String message = new String(body,"utf-8");
                System.out.println("收到的消息是:"+message);
            }
        };
        // 消费消息(队列名,是否自动确认,消费方法)
        channel.basicConsume(QUEUE,true,defaultConsumer);
    }
}

5.rabbitMq的工作模式

  1. Work queues 工作队列(资源竞争)

消息队列之RabbitMQ
    




消息队列之RabbitMQ

​ 生产者将消息放入到队列中,消费者可以有多个,同时监听同一个队列。如上图,消费者c1,c2共同争抢当前消息队列的内容,谁先拿到谁负责消费消息,缺点是在高并发的情况下,默认会产品一个消息被多个消费者共同使用,可以设置一个锁开关,保证一条消息只能被一个消费者使用。

上面的代码,可以再添加一个消费者,这样就可以实现工作队列的工作模式。

2.Publish/Subscribe 发布订阅(共享资源)

消息队列之RabbitMQ
    




消息队列之RabbitMQ

X代表rabbitMq内部组件交换机,生产者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应的消费者拿到消息进行消费,对比工作队列而言,发布订阅可以实现工作队列的功能,但是比工作队列更强大。

特点:
1.每个消费者监听自己的队列
2.生产者将消息发送给Broker,由交换机将消息转发到绑定的此交换机的每个队列,每个绑定交换机的队列都将接收到消息;

生产者:

package rabbitmq.publish;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @className: Producer
 * @description: 发布订阅的生产者
 * @author: charon
 * @create: 2021-01-07 22:02
 */
public class Producer {

    /**邮件的队列*/
    public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";

    /**短信的队列*/
    public static final String QUEUE_INFORM_SMS = "queue_inform_sms";

    /**交换机*/
    public static final String EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform";

    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置ip,端口,因为是本机,所以直接设置为127.0.0.1
        connectionFactory.setHost("127.0.0.1");
        // web端口默认为15672,通信端口为5672
        connectionFactory.setPort(5672);
        // 设置用户名和密码
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        // 设置虚拟ip,默认为/,一个rabbitmq的服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection();
            // 创建通道
            channel = connection.createChannel();
            // 声明队列(队列名称,是否持久化,是否排它,是否自动删除,队列的扩展参数比如设置存活时间等)
            channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
            channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
            // 交换机(交换机名称,交换机类型(fanout:发布订阅,direct:routing,topic:主题,headers:header模式))
            channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
            // 绑定交换机(队列名称,交换机名称,routingKey(发布订阅设置为空))
            channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,"");
            channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,"");
            // 发送多条消息
            for (int i = 0; i < 5; i++) {
                String message = "hello charon good evening by publish";
                // 指定交换机(交换机,RoutingKey即队列名,额外的消息属性,消息内容)
                channel.basicPublish(EXCHANGE_FANOUT_INFORM, "", null, message.getBytes());
                System.out.println("发送消息给mq:" + message);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            // 关闭资源
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

消费email的消费者:

package rabbitmq.publish;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @className: EmailConsumer
 * @description: 邮件的消息消费者
 * @author: charon
 * @create: 2021-01-07 22:14
 */
public class EmailConsumer {

    /**邮件的队列*/
    public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";

    /**短信的队列*/
    public static final String QUEUE_INFORM_SMS = "queue_inform_sms";

    /**交换机*/
    public static final String EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置ip,端口,因为是本机,所以直接设置为127.0.0.1
        connectionFactory.setHost("127.0.0.1");
        // web端口默认为15672,通信端口为5672
        connectionFactory.setPort(5672);
        // 设置用户名和密码
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        // 设置虚拟ip,默认为/,一个rabbitmq的服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
        connectionFactory.setVirtualHost("/");
        Connection connection = connectionFactory.newConnection();
        // 创建通道
        Channel channel = connection.createChannel();
        // 声明队列(队列名称,是否持久化,是否排它,是否自动删除,队列的扩展参数比如设置存活时间等)
        channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
        channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
        channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
        // 实现消费方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             *
             * @param consumerTag 消费者标签
             * @param envelope 信封,可以获取交换机等信息
             * @param properties 消息属性
             * @param body 消费内容,字节数组,可以转成字符串
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                String exchange = envelope.getExchange();
//                long deliveryTag = envelope.getDeliveryTag();
                String message = new String(body,"utf-8");
                System.out.println("收到的email消息是:"+message);
            }
        };
        // 消费消息(队列名,是否自动确认,消费方法)
        channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);
    }
}

消费短信的消费者:

package rabbitmq.publish;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @className: SmsConsumer
 * @description:
 * @author: charon
 * @create: 2021-01-07 22:17
 */
public class SmsConsumer {


    /**邮件的队列*/
    public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";

    /**短信的队列*/
    public static final String QUEUE_INFORM_SMS = "queue_inform_sms";

    /**交换机*/
    public static final String EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置ip,端口,因为是本机,所以直接设置为127.0.0.1
        connectionFactory.setHost("127.0.0.1");
        // web端口默认为15672,通信端口为5672
        connectionFactory.setPort(5672);
        // 设置用户名和密码
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        // 设置虚拟ip,默认为/,一个rabbitmq的服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
        connectionFactory.setVirtualHost("/");
        Connection connection = connectionFactory.newConnection();
        // 创建通道
        Channel channel = connection.createChannel();
        // 声明队列(队列名称,是否持久化,是否排它,是否自动删除,队列的扩展参数比如设置存活时间等)
        channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
        channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
        channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
        // 实现消费方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             *
             * @param consumerTag 消费者标签
             * @param envelope 信封,可以获取交换机等信息
             * @param properties 消息属性
             * @param body 消费内容,字节数组,可以转成字符串
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body,"utf-8");
                System.out.println("收到的短信消息是:"+message);
            }
        };
        // 消费消息(队列名,是否自动确认,消费方法)
        channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer);
    }
}

3.Routing 路由模式

消息队列之RabbitMQ
    




消息队列之RabbitMQ

生产者将消息发送给交换机按照路由判断,交换机根据路由的key,只能匹配上路由key的对应的消息队列,对应的消费者才能消费消息。

如上图,rabbitMq根据对应的key,将消息发送到对应的队列中,error通知将发送到amqp.gen-S9b上,由消费者c1消费。error,info,warning通知将发送到amqp.gen-Ag1上,由消费者c2消费。

特点:
1.每个消费者监听自己的队列,并且设置路由key
2.生产者将消息发送给交换机,由交换机根据路由key来转发消息到指定的队列

生产者:

package rabbitmq.routing;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @className: Producer
 * @description: 路由模式下的生成者
 * @author: charon
 * @create: 2021-01-07 22:34
 */
public class Producer {

    /**邮件的队列*/
    public static final String QUEUE_ROUTING_EMAIL = "queue_routing_email";

    /**短信的队列*/
    public static final String QUEUE_ROUTING_SMS = "queue_routing_sms";

    /**交换机*/
    public static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";

    /** 设置email的路由key */
    public static final String ROUTING_EMAIL = "routing_email";

    /** 设置sms的路由key */
    public static final String ROUTING_SMS = "routing_sms";

    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置ip,端口,因为是本机,所以直接设置为127.0.0.1
        connectionFactory.setHost("127.0.0.1");
        // web端口默认为15672,通信端口为5672
        connectionFactory.setPort(5672);
        // 设置用户名和密码
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        // 设置虚拟ip,默认为/,一个rabbitmq的服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection();
            // 创建通道
            channel = connection.createChannel();
            // 声明队列(队列名称,是否持久化,是否排它,是否自动删除,队列的扩展参数比如设置存活时间等)
            channel.queueDeclare(QUEUE_ROUTING_EMAIL, true, false, false, null);
            channel.queueDeclare(QUEUE_ROUTING_SMS, true, false, false, null);
            // 交换机(交换机名称,交换机类型(fanout:发布订阅,direct:routing,topic:主题,headers:header模式))
            channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
            // 绑定交换机(队列名称,交换机名称,routingKey)
            channel.queueBind(QUEUE_ROUTING_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTING_EMAIL);
            channel.queueBind(QUEUE_ROUTING_SMS,EXCHANGE_ROUTING_INFORM,ROUTING_SMS);
            // 发送多条消息
            for (int i = 0; i < 5; i++) {
                String message = "hello charon good evening by routing --email";
                // 指定交换机(交换机,RoutingKey,额外的消息属性,消息内容)
                channel.basicPublish(EXCHANGE_ROUTING_INFORM, ROUTING_EMAIL, null, message.getBytes());
                System.out.println("发送消息给mq:" + message);
            }
            // 发送多条消息
            for (int i = 0; i < 5; i++) {
                String message = "hello charon good evening by routing --sms";
                // 指定交换机(交换机,RoutingKey,额外的消息属性,消息内容)
                channel.basicPublish(EXCHANGE_ROUTING_INFORM, ROUTING_SMS, null, message.getBytes());
                System.out.println("发送消息给mq:" + message);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            // 关闭资源
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

消费email的消费者:

package rabbitmq.routing;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @className: EmailConsumer
 * @description: 路由模式下的email消费者
 * @author: charon
 * @create: 2021-01-07 22:40
 */
public class EmailConsumer {
    /**邮件的队列*/
    public static final String QUEUE_ROUTING_EMAIL = "queue_routing_email";

    /**交换机*/
    public static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";

    /** 设置email的路由key */
    public static final String ROUTING_EMAIL = "routing_email";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置ip,端口,因为是本机,所以直接设置为127.0.0.1
        connectionFactory.setHost("127.0.0.1");
        // web端口默认为15672,通信端口为5672
        connectionFactory.setPort(5672);
        // 设置用户名和密码
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        // 设置虚拟ip,默认为/,一个rabbitmq的服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
        connectionFactory.setVirtualHost("/");
        Connection connection = connectionFactory.newConnection();
        // 创建通道
        Channel channel = connection.createChannel();
        // 声明队列(队列名称,是否持久化,是否排它,是否自动删除,队列的扩展参数比如设置存活时间等)
        channel.queueDeclare(QUEUE_ROUTING_EMAIL, true, false, false, null);
        channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
        // 绑定队列并指明路由key
        channel.queueBind(QUEUE_ROUTING_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTING_EMAIL);
        // 实现消费方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             *
             * @param consumerTag 消费者标签
             * @param envelope 信封,可以获取交换机等信息
             * @param properties 消息属性
             * @param body 消费内容,字节数组,可以转成字符串
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body,"utf-8");
                System.out.println("收到的email消息是:"+message);
            }
        };
        // 消费消息(队列名,是否自动确认,消费方法)
        channel.basicConsume(QUEUE_ROUTING_EMAIL,true,defaultConsumer);
    }
}

消费短信的消费者:

package rabbitmq.routing;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @className: EmailConsumer
 * @description: 路由模式下的email消费者
 * @author: charon
 * @create: 2021-01-07 22:40
 */
public class SmsConsumer {
    /**邮件的队列*/
    public static final String QUEUE_ROUTING_SMS = "queue_routing_sms";

    /**交换机*/
    public static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";

    /** 设置email的路由key */
    public static final String ROUTING_SMS = "routing_sms";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置ip,端口,因为是本机,所以直接设置为127.0.0.1
        connectionFactory.setHost("127.0.0.1");
        // web端口默认为15672,通信端口为5672
        connectionFactory.setPort(5672);
        // 设置用户名和密码
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        // 设置虚拟ip,默认为/,一个rabbitmq的服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
        connectionFactory.setVirtualHost("/");
        Connection connection = connectionFactory.newConnection();
        // 创建通道
        Channel channel = connection.createChannel();
        // 声明队列(队列名称,是否持久化,是否排它,是否自动删除,队列的扩展参数比如设置存活时间等)
        channel.queueDeclare(QUEUE_ROUTING_SMS, true, false, false, null);
        channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
        // 绑定队列并指明路由key
        channel.queueBind(QUEUE_ROUTING_SMS,EXCHANGE_ROUTING_INFORM,ROUTING_SMS);
        // 实现消费方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             *
             * @param consumerTag 消费者标签
             * @param envelope 信封,可以获取交换机等信息
             * @param properties 消息属性
             * @param body 消费内容,字节数组,可以转成字符串
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body,"utf-8");
                System.out.println("收到的短信消息是:"+message);
            }
        };
        // 消费消息(队列名,是否自动确认,消费方法)
        channel.basicConsume(QUEUE_ROUTING_SMS,true,defaultConsumer);
    }
}

4.Topic 主题模式

消息队列之RabbitMQ
    




消息队列之RabbitMQ

  1. 星号井号代表通配符
  2. 星号代表一个单词,井号代表一个或多个单词
  3. 路由功能添加模糊匹配
  4. 消息产生者产生消息,把消息交给交换机
  5. 交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费

特点:
1.每个消费者监听自己的队列,并且设置带通配符的routingkey
2.生产者将消息发送给broker,由交换机及根据路由key来转发消息到指定的队列

5.Header 转发器

取消了路由key,使用header中的key/value(键值对)来匹配队列。

6.RPC 远程调用

消息队列之RabbitMQ
    




消息队列之RabbitMQ

基于direct类型交换机实现。生产者将消息远程发送给rpc队列,消费者监听rpc消息队列的消息并消息,然后将返回结果放入到响应队列中,生产者监听响应队列中的消息,拿到消费者的处理结果,实现远程RPC远程调用。

参考文件:

https://www.cnblogs.com/Jeely/p/10784013.html
https://lovnx.blog.csdn.net/article/details/70991021

相关文章:

  • 2021-08-03
  • 2021-09-23
  • 2021-03-31
  • 2021-12-02
猜你喜欢
  • 2021-01-17
  • 2021-11-24
  • 2021-12-06
  • 2021-07-29
  • 2018-11-14
  • 2021-12-23
相关资源
相似解决方案