1.模型

六、rabbitMQ订阅模式(publish_subscribe)

x:交换机(转发器 exchange)

解读:

(1).一个生产者,多个消费者

(2).每个消费者有自己的队列

(3).生产者没有将消息发往队列,而是发到交换机。

(4).每个队列都要绑定在交换机上

(5).生产者发送消息经过交换机到达队列就能实现一个消息被多个消费者消费

2.实例

(1)生产者

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.demo.rabbitMQ.util.ConnectionUtils;
import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;


public class Produce
{
private static final String EXCHANGE_NAME="test_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException 
{
//获取连接
Connection connection = ConnectionUtils.getConnection();
//获取通道
Channel channel = connection.createChannel();
//声明交换机   fanout一种分发模式
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String msg="我是生产者的消息";
//向交换机发送消息

channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());

                System.out.println("消息已经发送");

channel.close();
connection.close();
}
}

如图:六、rabbitMQ订阅模式(publish_subscribe)

疑问:发送消息成功后,消息却丢失了?因为交换机没有存储能力,在rabbitMQ中,只有队列有存储能力,这时候还没有队列绑定到交换机上所以消息丢失。

(2)消费者1

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.demo.rabbitMQ.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;


public class Consumer1 {
//队列
private static final String QUEUE_NAME="test_queues_exchange_fanout1";
//交换机
private static final String EXCHANGE_NAME="test_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException
{
//获取连接
Connection connection = ConnectionUtils.getConnection();
//获取通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//绑定队列到交换机转发器
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

//保证一次只分发一个
channel.basicQos(1);
//定义一个消费者
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) 
{
//一旦有消息就会触发该方法
@Override
public void handleDelivery(String consumerTag,Envelope envelope,BasicProperties basicProperties,byte[] body) throws IOException 
{
String msg=new String(body,"utf-8");
System.out.println("消费者1===========:"+msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally 
{
System.out.println("=================consumer1 over================");
//手动应答
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//关闭自动应答
boolean autoAck=false;
//监听队列
channel.basicConsume(QUEUE_NAME, autoAck, defaultConsumer);
}
}

(3)消费者2

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.demo.rabbitMQ.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;


public class Consumer2 {
//队列
private static final String QUEUE_NAME="test_queues_exchange_fanout2";
//交换机
private static final String EXCHANGE_NAME="test_exchange_fanout";

public static void main(String[] args) throws IOException, TimeoutException
{
//获取连接
Connection connection = ConnectionUtils.getConnection();
//获取通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//绑定队列到交换机转发器
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

//保证一次只分发一个
channel.basicQos(1);
//定义一个消费者
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) 
{
//一旦有消息就会触发该方法
@Override
public void handleDelivery(String consumerTag,Envelope envelope,BasicProperties basicProperties,byte[] body) throws IOException 
{
String msg=new String(body,"utf-8");
System.out.println("消费者2===========:"+msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally 
{
System.out.println("=================consumer2 over================");
//手动应答
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//关闭自动应答
boolean autoAck=false;
//监听队列
channel.basicConsume(QUEUE_NAME, autoAck, defaultConsumer);
}
}

如图:

六、rabbitMQ订阅模式(publish_subscribe)

   rabbitMQ中生产者声明的交换机中绑定了两个消费者的对列,当生产者向交换机中发送消息的时候,交换机会将消息分别发往两个队列,那么两个消费者就能消费相同数据。

源码地址:https://github.com/Carlutf8/rabbitMQ

3.补充知识(交换机/转发器(exchange)的介绍) 
   (1)功能:
            一方面接收生产者消息,另一方面向队列推送消息
   (2)rabbitMQ的转发类型:         

             a.匿名模式转发

             六、rabbitMQ订阅模式(publish_subscribe)

             如上图,我们发布向rabbitMQ发送消息的时候,设置了空置,为匿名转发。

            b.交换机模式转发

             六、rabbitMQ订阅模式(publish_subscribe)

            交换机的类型:

            b.1  Fanout(不处理路由键)

            六、rabbitMQ订阅模式(publish_subscribe)

                这种类型交换机会把消息发送给任何一个绑定到交换机上的队列中。

              b.2  Direct(处理路由键) 

             六、rabbitMQ订阅模式(publish_subscribe)

               这种模式你发送消息时要带一个路由key,你所绑定的队列也要带一个路由key,那么交换机就会将消息发送到相匹配的路由key中。

              b.3 topic(主题模式)

                  六、rabbitMQ订阅模式(publish_subscribe)

                    将路由key通过通配符绑定到主题队列上。

                     *  匹配一个

                     # 匹配一个或多个





相关文章: