前面介绍了ForkJoinPool相关的两个类ForkJoinTask、ForkJoinWorkerThread,现在开始了解ForkJoinPool。ForkJoinPool也是实现了ExecutorService的线程池。但ForkJoinPool不同于其他类型的ExecutorService,主要是因为它使用了窃取工作机制:池中的所有线程都试图查找和执行提交给池和/或由其他活动任务创建的任务(如果不存在工作,则最终阻塞等待工作)。但ForkJoinPool并不是为了代替其他两个线程池,大家所适用的场景各不相同。ForkJoinPool主要是为了执行ForkJoinTask而存在,而ForkJoinTask在上一文已经讲过是一种可以将任务进行递归分解执行从而提高执行并行度的任务,那么ForkJoinPool线程池当然主要就是为了完成这些可递归分解任务的调度执行,加上一些对线程池生命周期的控制,以及提供一些对池的状态检查方法(例如getStealCount),用于帮助开发、调优和监视fork/join应用程序。同样,方法toString以方便的形式返回池状态的指示,以便进行非正式监视。

由于ForkJoinPool的源码太长并且其中涉及到的设计实现非常复杂,目前理解有限,只能做大概的原理阐述以及一些使用示例。

ForkJoinPool原理

在原理开始之前,先了解一下其构造方法,其参数最多的一个构造方法如下:

1 public ForkJoinPool(int parallelism,
2                 ForkJoinWorkerThreadFactory factory,
3                 UncaughtExceptionHandler handler,
4                 boolean asyncMode) {
5     //...........                
6 }

构造ForkJoinPool可以指定如下四个参数:

parallelism: 并行度,默认为CPU核心数,最小为1。为1的时候相当于单线程执行。

factory:工作线程工厂,用于创建ForkJoinWorkerThread。

handler:处理工作线程运行任务时的异常情况类,默认为null。

asyncMode:是否为异步模式,默认为 false。这里的同步/异步并不是指F/J框架本身是采用同步模式还是采用异步模式工作,而是指其中的工作线程的工作方式。在F/J框架中,每个工作线程(Worker)都有一个属于自己的任务队列(WorkQueue),这是一个底层采用数组实现的双向队列。同步是指:对于工作线程(Worker)自身队列中的任务,采用后进先出(LIFO)的方式执行;异步是指:对于工作线程(Worker)自身队列中的任务,采用先进先出(FIFO)的方式执行。为true的异步模式只在不join任务结果的消息传递框架中非常有用,因此一般如果任务的结果需要通过join合并,则该参数都设为false。

创建 ForkJoinPool实例除了使用构造方法外,从JDK8开始,还提供了一个静态的commonPool(),该方法可以通过指定系统参数的方式(System.setProperty(?,?))定义“并行度、线程工厂和异常处理类”;并且它使用的是同步模式,也就是说可以支持任务合并(join)。使用该方法返回的ForkJoinPool实例一般来说都能满足大多数的使用场景。

内部数据结构

ForkJoinPool采用了哈希数组 + 双端队列的方式存放任务,但这里的任务分为两类,一类是通过execute、submit 提交的外部任务,另一类是ForkJoinWorkerThread工作线程通过fork/join分解出来的工作任务,ForkJoinPool并没有把这两种任务混在一个任务队列中,对于外部任务,会利用Thread内部的随机probe值映射到哈希数组的偶数槽位中的提交队列中,这种提交队列是一种数组实现的双端队列称之为Submission Queue,专门存放外部提交的任务。对于ForkJoinWorkerThread工作线程,每一个工作线程都分配了一个工作队列,这也是一个双端队列,称之为Work Queue,这种队列都会被映射到哈希数组的奇数槽位,每一个工作线程fork/join分解的任务都会被添加到自己拥有的那个工作队列中。

在ForkJoinPool中的属性 WorkQueue[] workQueues 就是我们所说的哈希数组,其元素就是内部类WorkQueue实现的基于数组的双端队列。该哈希数组的长度为2的幂,并且支持扩容。如下就是该哈希数组的示意结构图:

Java并发包线程池之ForkJoinPool即ForkJoin框架(二)

如图,提交队列位于哈希数组workQueue的奇数索引槽位,工作线程的工作队列位于偶数槽位,默认情况下,asyncMode为false,因此工作线程把工作队列当着栈一样使用,将分解的子任务推入工作队列的top端,取任务的时候也从top端取(前面关于双端队列的介绍中,凡是双端队列都会有两个分别指向队列两端的指针,这里就是图上画出的base和top),而当某些工作线程的任务为空的时候,就会从其他队列(不限于workQueue,也会是提交队列)窃取(steal)任务,如图示拥有workQueue2的工作线程从workQueue1中窃取了一个任务,窃取任务的时候采用的是先进先出FIFO的策略(即从base端窃取任务),这样不但可以避免在取任务的时候与拥有其队列的工作线程发生冲突,从而减小竞争,还可以辅助其完成比较大的任务。而当asyncMode为true的话,拥有该工作队列的工作线程将按照先进先出的策略从base端取任务,这一般只用于不需要返回结果的任务,或者事件消息传递框架。

上图展示了提交任务与分解的任务在ForkJoinPool内部的组织形式,并且简单的阐述了工作窃取机制的原理,其实,工作窃取机制的实现过程还包含很多细节,并没有怎么简单,例如还有一种“互助机制”,假设工作线程2窃取了工作线程1的任务之后,通过fork/join又分解产生了子任务,这些子任务会进入工作线程2的工作队列中,这时候如果工作线程1把剩余的任务都完成了,当他发现自己的任务被别人窃取的话,那么它会试着去窃取工作线程2的任务,(你偷了我的,现在我就要偷你的),这就是互助机制。

关于ForkJoinPool的源码太复杂就不分析了,可以参考如下几篇文章:https://www.jianshu.com/apps?utm_medium=desktop&utm_source=navbar-apps 和https://www.cnblogs.com/zhuxudong/p/10122688.html

一些public方法

 除了同ThreadPoolExecutor线程池一样重写了AbstractExecutorService的一些方法例如,submit(异步提交任务返回Future)、execute(异步提交任务无返回值)、invokeAll(批量执行该类没有重写)、invokeAny(只要有一个执行结束就返回,该类没有重写)等方法之外,ForkJoinPool增加了一些自己特有的用于监视其状态的方法可供使用者调用,但一般来说这些方法都用不上,除了那些提交任务的方法:

awaitQuiescence,等待线程池空闲

isQuiescent,如果线程池处于空闲状态,返回true。

getActiveThreadCount,获取正在执行任务或窃取任务的线程个数。

getQueuedSubmissionCount(),获取提交提交给线程池但还没开始执行的任务个数。就是所有提交队列中任务之和。

hasQueuedSubmissions(),若getQueuedSubmissionCount不为0,返回true,表示存在任务还在提交队列没被执行。

getQueuedTaskCount(),返回所有工作线程工作队列中的所有任务个数。

getRunningThreadCount(),返回正在运行的没有被阻塞(例如调用Join会被阻塞)的线程个数。

getStealCount(),返回线程从另一个线程的工作队列中窃取的任务总数的估计值。

getParallelism(),返回线程池的并行度,构造方法中有这个参数。

getAsyncMode(),返回工作线程从其任务队列中取走任务的模式,默认为false,表示LIFO后进先出,否则是FIFO,构造方法中有这个参数。

使用示例

对于Fork/Join的使用,也是围绕ForkJoinTask的三个抽象子类的不同作用进行的,我们知道ForkJoinTask的三个子类是RecursiveAction、RecursiveTask和CountedCompleter,分别用于不返回结果,返回结果以及完成触发指定操作的操作。

对于RecursiveAction的使用,最容易让人想到的例子就是,最一个集合或者数组中的所有元素进行自增运行的操作:

 1 public class IncrementTask extends RecursiveAction {
 2 
 3     static final int THRESHOLD = 10;
 4     final long[] array;
 5     final int lo, hi;
 6 
 7     IncrementTask(long[] array, int lo, int hi) { // 构造方法,指定数组下标范围
 8         this.array = array;
 9         this.lo = lo;
10         this.hi = hi;
11     }
12 
13     // 实现抽象类的接口,这个任务所执行的主要计算。
14     protected void compute() {
15         if (hi - lo < THRESHOLD) { // 数组索引区间小于阈值(任务足够小)
16             for (int i = lo; i < hi; ++i)
17                 array[i]++; // 对每个元素自增1
18         } else {
19             int mid = (lo + hi) >>> 1; // 拆分一半
20             invokeAll(new IncrementTask(array, lo, mid), new IncrementTask(array, mid, hi)); // 一起执行
21         }
22     }
23     
24     public static void main(String[] args) {
25         long[] array = ... ;//一个很长的数组
26         new IncrementTask(array, 0, array.length).invoke(); //隐式的使用了ForkJoinPool.commonPool()
27     }
28 }
View Code

相关文章: