- 工作队列
使用工作队列实现任务分发的功能,一个队列的优点就是很容易处理并行化的工作能力,但是如果我们积累了大量的工作,我们就需要更多的工作者来处理,这里就要采用分布机制了。

本示例主要演示显示的功能:

定义交换机
多个消费者同时订阅一个队列
模式采用手动应答

RabbitMQ入门教程(四):工作队列(Work Queues)
代码如下:

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import org.junit.Test;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

/**
 * 工作队列的演示
 * @author Dell
 *
 */
public class WordQueues {
	
	@Test
	public void basicPublish() throws IOException, TimeoutException {
		// 创建连接工厂
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		factory.setPort(AMQP.PROTOCOL.PORT);
		factory.setUsername("guest");
		factory.setPassword("guest");
		
		Connection conn = factory.newConnection();
		Channel channel = conn.createChannel();
		
		// 声明队列
		String QUEUE_NAME = "queue work";
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		
		// 交换机名
		String EXCHANGE_NAME = "amqp.rabbit.work";
		// 声明交换机,指定交换机名和类型(BuiltinExchangeType.DIRECT:直连)
		channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
		
		// 路由键名
		String ROUTING_KEY = "task";
		// 根据队列名、交换机名、路由键名,绑定队列和交换机,不使用额外的参数
		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
		
		// 循环发布多条消息
		for (int i = 0; i < 10; i++) {
			// 消息内容
			String message = "Hello RabbitMQ" + i;
			// 发布消息
			channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, false, null, message.getBytes("UTF-8"));
		}
		
		// 关闭资源
		channel.close();
		conn.close();
	}
	
	@Test
	public void basicConsume1() throws IOException, TimeoutException, InterruptedException {
		// 创建连接工厂
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		factory.setPort(AMQP.PROTOCOL.PORT);
		factory.setUsername("guest");
		factory.setPassword("guest");
		
		// 创建连接
		Connection conn = factory.newConnection();
		// 创建信道
		final Channel channel = conn.createChannel();
		/*
		 * 设置每次从队列获取消息的数量
		 * 如果不设置,RabbitMQ就会采用 轮询方式 分派消息给每个消费者,而不考虑消费者的处理能力
		 * 设置后,会采取 公平转发 的方式分派消息,即:能者多劳,处理完的继续处理下一个消息
		 */
		channel.basicQos(1);
		
		// 创建队列
		String QUEUE_NAME = "queue work";
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		System.out.println("Consumer1 waiting receive message...");
		
		Consumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
					throws IOException {
				// 获取消息
				String message = new String(body, "UTF-8");
				try {
					doWork1(message);
					// 手动应答
					channel.basicAck(envelope.getDeliveryTag(), false);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		};
		
		// 订阅消息,false:表示手动应答,需要显示调用basicAck()来应答
		channel.basicConsume(QUEUE_NAME, false, consumer);
		
		// 不让程序立即结束,这样还有机会消费获取第二条消息
		Thread.sleep(1000000);
	}
	
	@Test
	public void basicConsume2() throws IOException, TimeoutException, InterruptedException {
		// 创建连接工厂
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		factory.setPort(AMQP.PROTOCOL.PORT);
		factory.setUsername("guest");
		factory.setPassword("guest");
		
		// 创建连接
		Connection conn = factory.newConnection();
		// 创建信道
		final Channel channel = conn.createChannel();
		// 设置每次从队列获取消息的数量
		//channel.basicQos(1);
		
		// 创建队列
		String QUEUE_NAME = "queue work";
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		System.out.println("Consumer2 waiting receive message...");
		
		Consumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
					throws IOException {
				// 获取消息
				String message = new String(body, "UTF-8");
				try {
					doWork2(message);
					// 手动应答
					channel.basicAck(envelope.getDeliveryTag(), false);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		};
		
		// 订阅消息,false:表示手动应答,需要显示调用basicAck()来应答
		channel.basicConsume(QUEUE_NAME, false, consumer);
		
		// 不让程序立即结束,这样还有机会消费获取第二条消息
		Thread.sleep(1000000);
	}
	
	/**
	 * 模拟处理消息
	 * @param message
	 * @throws InterruptedException 
	 */
	private void doWork1(String message) throws InterruptedException {
		System.out.println("[C] Received " + message + ",处理业务中...");
		// 模仿消费者处理业务,同时也让其他消费者有机会获取消息,实际业务中不需要
		Thread.sleep(1000);
	}
	
	private void doWork2(String message) throws InterruptedException {
		System.out.println("[C] Received " + message + ",处理业务中...");
		// 模仿消费者处理业务,同时也让其他消费者有机会获取消息,实际业务中不需要
		Thread.sleep(100000);
	}

}

运行结果
为了能让两个消费者均分消息,需要先启动消费者,最后再启动生产者
RabbitMQ入门教程(四):工作队列(Work Queues)

RabbitMQ入门教程(四):工作队列(Work Queues)
两个消费者轮流消费消息

交换机的直连接类型direct
交换机的类型用于交换机如何将消息路由到哪些队列中去,在绑定队列时channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY)会将队列名、交换机名称、路由键作为一条记录插入到数据库中,当发布消息时channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes(“UTF-8”))会指定交换机名称和路由键,如果将交换机名+路由键 作为联合主键,根据这个联合组件就能查到队列名(EXCHANGE_NAME + ROUTING_KEY)—》QUEUE_NAME, 从而将消息发送到指定的队列,直连接的规则就是 绑定的路由键和发布消息的路由键必须完全相等,完全相等,完全相等,重要的事情说三遍, 才能路由到关联的队列上,用java代码表示 routingKeyForPublishMessage.equals(routingKeyForQueueBind),
直连接就是完全相等,像其他类型可以类似于正则表达式或是数据库中的like的模糊匹配,只要模糊匹配成功就可以将消息路由到指定的队列中。

适用场景:有优先级的任务,根据任务的优先级把消息发送到对应的队列,这样可以指派更多的资源去处理高优先级的队列。

临时队列(Temporary queues)
Java中我们可以使用queueDeclare()方法,不传递任何参数,来创建一个非持久的、唯一的、自动删除的队列且队列名称由服务器随机产生。

String queueName = channel.queueDeclare().getQueue();
一般情况这个名称与amq.gen-JzTY20BRgKO-HjmUJj0wLg 类似。

public AMQP.Queue.DeclareOk queueDeclare() throws IOException {
    // 底层会随机产生一个队列名称
    return queueDeclare("", false, true, true, null);
}

轮询分发(Round-robin)
在默认情况下,RabbitMQ将逐个发送消息到在序列中的下一个消费者(而不考虑每个任务的时长等等,且是提前一次性分配,并非一个一个的分配)。平均每个消费者获得相同数量的消息。这种方式分发消息机制称为Round-Robin(轮询)。

当消息进入队列,RabbitMQ就会分派消息。它不看消费者的应答的数目,也不关心消费者处理消息的能力,只是盲目的暴力的将第n条消息发给第n个消费者。

公平转发(Fair dispatch)
当工作队列中有两个消费者中,可以看到消费者1收到的消息都是偶数条,消费者1都是奇数条,假如偶数条的消息处理比较耗时,奇数条的消息处理很快耗时短,当有多条消息在队列中,队列一下子就把所有奇数条消息推送给消费者1,把所有偶数条消息推送给消费者2,由于消费者1处理的消息不叫耗时,消费者2处理比较快,很可能出现当消费者1才处理几条的时,消费者2就已经完全处理了,这样消费者2就处理空闲状态,而消费者1却忙的跟狗似的。为了解决这种现象,让干的快的干完了帮助干的慢的分担点任务,RabbitMQ采用限制消费者一次从队列中获取消息的条数,而不是一下子把满足条件的消息都推送个某个消费者,通过使用channel.basicQos(1)告诉消费者一次只能从队列中预先获取一条(预提取数量prefetchCount),处理完了再获取另一条,这样其他消息仍然在队列中,还没有被分发出去,这样就会造成处理消息慢的继续处理当前消息,处理消息的快的由于一次只能从队列中获取一条,处理完继续从队列中获取,这样就会出现能者多劳,大家谁都不会闲着。使用公平转发这种方式支持动态添加消费者,比如队列中的消息很多,两个消费者处理不过来,需要再增加消费者来处理,由于消息还在队列中,还没有被分发出去,这样再增加消费者,消费者就能马上从队列中获取消息,立即投入进来工作。

RabbitMQ入门教程(四):工作队列(Work Queues)

当将channel.basicQos(1);这行代码注释后,RabbitMQ由轮询分发模式变为公平分发模式,此时,消费者处理消息越快,消费的消息也就越多。

消息持久化(Message durability)
默认队列和消息都是放在内存中的,当RabbitMQ退出或者崩溃,将会丢失队列和消息。为了保证即使RabbitMQ崩溃也不会丢失消息,我们必须把“队列”和“消息”设为持久化,当队列和消息持久化以后即使RabbitMQ崩溃,消息还存在数据库中,当RabbitMQ再次启动的时候,队列和消息仍然还在。

// 队列持久化
boolean durable = true;  
channel.queueDeclare("hello", durable, false, false, null); 

// 消息持久化 方式一
channel.basicPublish("", "key", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));

// 消息持久化 方式二
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder();
properties.deliveryMode(2);  // 设置消息是否持久化,1: 非持久化 2:持久化
channel.basicPublish("", "key", properties.build(), message.getBytes("UTF-8"));

RabbitMQ入门教程(四):工作队列(Work Queues)
什么时候使用持久化?首先你需要分析并测试性能需求,通常关键的消息一般都使用持久化。

相关文章: