创建一个工作队列用来在工作者(consumer)间分发耗时任务。

  工作队列的主要任务是:避免立刻执行资源密集型任务,然后必须等待其完成。相反地,我们进行任务调度:我们把任务封装为消息发送给队列。工作进行在后台运行并不断的从队列中取出任务然后执行。当你运行了多个工作进程时,任务队列中的任务将会被工作进程共享执行。
这样的概念在web应用中极其有用,当在很短的HTTP请求间需要执行复杂的任务。

  1、 准备

  我们使用Thread.sleep来模拟耗时的任务。我们在发送到队列的消息的末尾添加一定数量的点,每个点代表在工作线程中需要耗时1秒,例如hello…将会需要等待3秒。

  发送端:

 1 public class NewTask  
 2 {  
 3     //队列名称  
 4     private final static String QUEUE_NAME = "workqueue";  
 5   
 6     public static void main(String[] args) throws IOException  
 7     {  
 8         //创建连接和频道  
 9         ConnectionFactory factory = new ConnectionFactory();  
10         factory.setHost("localhost");  
11         Connection connection = factory.newConnection();  
12         Channel channel = connection.createChannel();  
13         //声明队列  
14         channel.queueDeclare(QUEUE_NAME, false, false, false, null);  
15         //发送10条消息,依次在消息后面附加1-10个点  
16         for (int i = 0; i < 10; i++)  
17         {  
18             String dots = "";  
19             for (int j = 0; j <= i; j++)  
20             {  
21                 dots += ".";  
22             }  
23             String message = "helloworld" + dots+dots.length();  
24             channel.basicPublish("", QUEUE_NAME, null, message.getBytes());  
25             System.out.println(" [x] Sent '" + message + "'");  
26         }  
27         //关闭频道和资源  
28         channel.close();  
29         connection.close();  
30   
31     }  
32   
33   
34 }  

  接收端:

 1 public class Work  
 2 {  
 3     //队列名称  
 4     private final static String QUEUE_NAME = "workqueue";  
 5   
 6     public static void main(String[] argv) throws java.io.IOException,  
 7             java.lang.InterruptedException  
 8     {  
 9         //区分不同工作进程的输出  
10         int hashCode = Work.class.hashCode();  
11         //创建连接和频道  
12         ConnectionFactory factory = new ConnectionFactory();  
13         factory.setHost("localhost");  
14         Connection connection = factory.newConnection();  
15         Channel channel = connection.createChannel();  
16         //声明队列  
17         channel.queueDeclare(QUEUE_NAME, false, false, false, null);  
18         System.out.println(hashCode  
19                 + " [*] Waiting for messages. To exit press CTRL+C");  
20       
21         QueueingConsumer consumer = new QueueingConsumer(channel);  
22         // 指定消费队列  
23         channel.basicConsume(QUEUE_NAME, true, consumer);  
24         while (true)  
25         {  
26             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
27             String message = new String(delivery.getBody());  
28   
29             System.out.println(hashCode + " [x] Received '" + message + "'");  
30             doWork(message);  
31             System.out.println(hashCode + " [x] Done");  
32   
33         }  
34   
35     }  
36   
37     /** 
38      * 每个点耗时1s 
39      * @param task 
40      * @throws InterruptedException 
41      */  
42     private static void doWork(String task) throws InterruptedException  
43     {  
44         for (char ch : task.toCharArray())  
45         {  
46             if (ch == '.')  
47                 Thread.sleep(1000);  
48         }  
49     }  
50 }  

  Round-robin 转发
  使用任务队列的好处是能够很容易的并行工作。如果我们积压了很多工作,我们仅仅通过增加更多的工作者就可以解决问题,使系统的伸缩性更加容易。
下面我们先运行3个工作者(Work.java)实例,然后运行NewTask.java,3个工作者实例都会得到信息。  

 1 [x] Sent 'helloworld.1'
 2 [x] Sent 'helloworld..2'
 3 [x] Sent 'helloworld...3'
 4 [x] Sent 'helloworld....4'
 5 [x] Sent 'helloworld.....5'
 6 [x] Sent 'helloworld......6'
 7 [x] Sent 'helloworld.......7'
 8 [x] Sent 'helloworld........8'
 9 [x] Sent 'helloworld.........9'
10 [x] Sent 'helloworld..........10'
11 工作者1:
12 605645 [*] Waiting for messages. To exit press CTRL+C
13 605645 [x] Received 'helloworld.1'
14 605645 [x] Done
15 605645 [x] Received 'helloworld....4'
16 605645 [x] Done
17 605645 [x] Received 'helloworld.......7'
18 605645 [x] Done
19 605645 [x] Received 'helloworld..........10'
20 605645 [x] Done
21 工作者2:
22 18019860 [*] Waiting for messages. To exit press CTRL+C
23 18019860 [x] Received 'helloworld..2'
24 18019860 [x] Done
25 18019860 [x] Received 'helloworld.....5'
26 18019860 [x] Done
27 18019860 [x] Received 'helloworld........8'
28 18019860 [x] Done
29 工作者3:
30 18019860 [*] Waiting for messages. To exit press CTRL+C
31 18019860 [x] Received 'helloworld...3'
32 18019860 [x] Done
33 18019860 [x] Received 'helloworld......6'
34 18019860 [x] Done
35 18019860 [x] Received 'helloworld.........9'
36 18019860 [x] Done
View Code

相关文章: