这是Java并发包提供的最后一个线程池实现,也是最复杂的一个线程池。针对这一部分的代码太复杂,由于目前理解有限,只做简单介绍。通常大家说的Fork/Join框架其实就是指由ForkJoinPool作为线程池、ForkJoinTask(通常实现其三个抽象子类)为任务、ForkJoinWorkerThread作为执行任务的具体线程实体这三者构成的任务调度机制。通俗的说,ForkJoin框架的作用主要是为了实现将大型复杂任务进行递归的分解,直到任务足够小才直接执行,从而递归的返回各个足够小的任务的结果汇集成一个大任务的结果,依次类推最终得出最初提交的那个大型复杂任务的结果,这和方法的递归调用思想是一样的。当然ForkJoinPool线程池为了提高任务的并行度和吞吐量做了非常多而且复杂的设计实现,其中最著名的就是任务窃取机制。

对照前面介绍的ThreadPoolExecutor执行的任务是Future的实现类FutureTask、执行线程的实体是内部类Worker,ForkJoinPool执行的任务就是Future的实现类ForkJoinTask、执行线程就是ForkJoinWorkerThread。

ForkJoinWorkerThread

该类直接继承了Thread,但是仅仅是为了增加一些额外的功能,并没有对线程的调度执行做任何更改。ForkJoinWorkerThread是被ForkJoinPool管理的工作线程,在创建出来之后都被设置成为了守护线程,由它来执行ForkJoinTasks。该类主要为了维护创建线程实例时通过ForkJoinPool为其创建的任务队列,与其他两个线程池整个线程池只有一个任务队列不同,ForkJoinPool管理的所有工作线程都拥有自己的工作队列,为了实现任务窃取机制,该队列被设计成一个双端队列,而ForkJoinWorkerThread的首要任务就是执行自己的这个双端任务队列中的任务,其次是窃取其他线程的工作队列,以下是其代码片段:

 1 public class ForkJoinWorkerThread extends Thread {
 2 
 3     final ForkJoinPool pool;                // 这个线程工作的ForkJoinPool池
 4     final ForkJoinPool.WorkQueue workQueue; // 这个线程拥有的工作窃取机制的工作队列
 5 
 6     //创建在给定ForkJoinPool池中执行的ForkJoinWorkerThread。
 7     protected ForkJoinWorkerThread(ForkJoinPool pool) {
 8         // Use a placeholder until a useful name can be set in registerWorker
 9         super("aForkJoinWorkerThread");
10         this.pool = pool;
11         this.workQueue = pool.registerWorker(this); //向ForkJoinPool执行池注册当前工作线程,ForkJoinPool为其分配一个工作队列
12     }
13     
14     //该工作线程的执行内容就是执行工作队列中的任务
15     public void run() {
16         if (workQueue.array == null) { // only run once
17             Throwable exception = null;
18             try {
19                 onStart();
20                 pool.runWorker(workQueue); //执行工作队列中的任务
21             } catch (Throwable ex) {
22                 exception = ex; //记录异常
23             } finally {
24                 try {
25                     onTermination(exception);
26                 } catch (Throwable ex) {
27                     if (exception == null)
28                         exception = ex;
29                 } finally {
30                     pool.deregisterWorker(this, exception); //撤销工作
31                 }
32             }
33         }
34     }
35     
36     .....
37 }
View Code

相关文章: