创建一个工作队列用来在工作者(consumer)间分发耗时任务。
工作队列的主要任务是:避免立刻执行资源密集型任务,然后必须等待其完成。相反地,我们进行任务调度:我们把任务封装为消息发送给队列。工作进行在后台运行并不断的从队列中取出任务然后执行。当你运行了多个工作进程时,任务队列中的任务将会被工作进程共享执行。
这样的概念在web应用中极其有用,当在很短的HTTP请求间需要执行复杂的任务。
1、 准备
我们使用Thread.sleep来模拟耗时的任务。我们在发送到队列的消息的末尾添加一定数量的点,每个点代表在工作线程中需要耗时1秒,例如hello…将会需要等待3秒。
发送端:
1 public class NewTask 2 { 3 //队列名称 4 private final static String QUEUE_NAME = "workqueue"; 5 6 public static void main(String[] args) throws IOException 7 { 8 //创建连接和频道 9 ConnectionFactory factory = new ConnectionFactory(); 10 factory.setHost("localhost"); 11 Connection connection = factory.newConnection(); 12 Channel channel = connection.createChannel(); 13 //声明队列 14 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 15 //发送10条消息,依次在消息后面附加1-10个点 16 for (int i = 0; i < 10; i++) 17 { 18 String dots = ""; 19 for (int j = 0; j <= i; j++) 20 { 21 dots += "."; 22 } 23 String message = "helloworld" + dots+dots.length(); 24 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 25 System.out.println(" [x] Sent '" + message + "'"); 26 } 27 //关闭频道和资源 28 channel.close(); 29 connection.close(); 30 31 } 32 33 34 }
接收端:
1 public class Work 2 { 3 //队列名称 4 private final static String QUEUE_NAME = "workqueue"; 5 6 public static void main(String[] argv) throws java.io.IOException, 7 java.lang.InterruptedException 8 { 9 //区分不同工作进程的输出 10 int hashCode = Work.class.hashCode(); 11 //创建连接和频道 12 ConnectionFactory factory = new ConnectionFactory(); 13 factory.setHost("localhost"); 14 Connection connection = factory.newConnection(); 15 Channel channel = connection.createChannel(); 16 //声明队列 17 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 18 System.out.println(hashCode 19 + " [*] Waiting for messages. To exit press CTRL+C"); 20 21 QueueingConsumer consumer = new QueueingConsumer(channel); 22 // 指定消费队列 23 channel.basicConsume(QUEUE_NAME, true, consumer); 24 while (true) 25 { 26 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 27 String message = new String(delivery.getBody()); 28 29 System.out.println(hashCode + " [x] Received '" + message + "'"); 30 doWork(message); 31 System.out.println(hashCode + " [x] Done"); 32 33 } 34 35 } 36 37 /** 38 * 每个点耗时1s 39 * @param task 40 * @throws InterruptedException 41 */ 42 private static void doWork(String task) throws InterruptedException 43 { 44 for (char ch : task.toCharArray()) 45 { 46 if (ch == '.') 47 Thread.sleep(1000); 48 } 49 } 50 }
Round-robin 转发
使用任务队列的好处是能够很容易的并行工作。如果我们积压了很多工作,我们仅仅通过增加更多的工作者就可以解决问题,使系统的伸缩性更加容易。
下面我们先运行3个工作者(Work.java)实例,然后运行NewTask.java,3个工作者实例都会得到信息。
1 [x] Sent 'helloworld.1' 2 [x] Sent 'helloworld..2' 3 [x] Sent 'helloworld...3' 4 [x] Sent 'helloworld....4' 5 [x] Sent 'helloworld.....5' 6 [x] Sent 'helloworld......6' 7 [x] Sent 'helloworld.......7' 8 [x] Sent 'helloworld........8' 9 [x] Sent 'helloworld.........9' 10 [x] Sent 'helloworld..........10' 11 工作者1: 12 605645 [*] Waiting for messages. To exit press CTRL+C 13 605645 [x] Received 'helloworld.1' 14 605645 [x] Done 15 605645 [x] Received 'helloworld....4' 16 605645 [x] Done 17 605645 [x] Received 'helloworld.......7' 18 605645 [x] Done 19 605645 [x] Received 'helloworld..........10' 20 605645 [x] Done 21 工作者2: 22 18019860 [*] Waiting for messages. To exit press CTRL+C 23 18019860 [x] Received 'helloworld..2' 24 18019860 [x] Done 25 18019860 [x] Received 'helloworld.....5' 26 18019860 [x] Done 27 18019860 [x] Received 'helloworld........8' 28 18019860 [x] Done 29 工作者3: 30 18019860 [*] Waiting for messages. To exit press CTRL+C 31 18019860 [x] Received 'helloworld...3' 32 18019860 [x] Done 33 18019860 [x] Received 'helloworld......6' 34 18019860 [x] Done 35 18019860 [x] Received 'helloworld.........9' 36 18019860 [x] Done