我们知道,ExecutorService是一个抽象出线程池的一个接口,然后我们在使用线程池的时候,用的是Executors工具类中的一系列newCachedThreadPool() 等类似的方法,这些方法之间返回一个可以用的线程池。但其实这些方法都是在里面调用了一个类——ThreadPoolExecutor,这个是个Java线程池真正的实现类,Executors中的工具方法都是new了这个ThreadPoolExecutro类的对象,只是传了不同的参数进去而已。
所以有必要花点时间研究下这个实现类。
一、类声明:
继承了一个AbstractExecutorService,这个显然就是一个ExecutorService接口的基本抽象实现类。
二、线程池的处理机制
我们想想,要实现一个线程池,是不是有个集合来放线程;然后我们需要用这个线程池去解决我们提交的任务吧,那么如果任务提交得比线程池中的线程多怎么办,那么肯定有个队列来放这些还没来得及给线程运行的任务吧。
2.1 所以,线程池中主要就两个数据结构
一个放线程的集合:
可以看见注解,在对这个结合操作,访问的时候要先获得全局锁——mainLock。
而这个Worker是什么呢?其实就是一个线程的封装类,我们可以简单地把它看作是在线程池中用来提供服务的工作线程的代表,关于Worker这个重要的内部类待会在下面会详细介绍。
还有一个放任务的队列:
这显然是个生产者、消费者的模型吧,多个工作线程就是消费者,然后使用这个线程池的线程比如是main所在的主线程,就是生产者,因为负责submit任务(Runable或者是Callable)给线程池。既然是生产者消费者模型,那么就要考虑生产者和消费者的同步问题,所以这里用的是阻塞队列。
2.2 线程池的处理机制
我们用一张图片来看看线程池的大概处理流程先:
(图片来自:https://www.cnblogs.com/winner-0715/p/7388662.html)
这里的队列说的就是BlockQueue的workQueue也就是我们的任务队列。
由这个流程我们可以看到,有几个边界值:
核心线程池的数量;
任务队列的长度;
最大线程池的数量。
任务队列大家都知道了,那么核心线程池和最大线程池呢??
2.3 在ThreadPoolExecutor类中有两个重要的类变量,
由注释可见,corePoolSize是需要维护的最小的存活的线程数量,可以这样理解,我们现在有一个线程池嘛,如果任务很少,就生产者没怎么生产任务,那么如果维护着很多活着的线程,不就很浪费内存嘛,所以当没任务的时候,这个corePoolSize就是要维持着的最少的或者的线程的数量。
注解还写了,如果你设置了allowCoreThreadTimeOut为true的话,那么这个最小的线程数量可能就为0了,因为这个参数的意思是允许coreThread有时间界限,就即使你的线程数量少于coreThreadPool的数量,但空闲时间(就是在堵塞任务队列堵住的时间)超过一个keepAliveTime(也是一个类变量)的时候,就可以继续减少线程的数量,直至0。
2.4 所以再来更细致地看看这个流程:
1、如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(需要获得全局锁);——也就是说一开始直接就新建线程来执行任务
2、如果运行的线程等于或多于corePoolSize ,则将任务加入BlockingQueue;——大于这个corePool后,进来的任务就要在阻塞队列中排队了。
3、如果无法将任务加入BlockingQueue(队列已满并且正在运行的线程数量小于 maximumPoolSize),则创建新的线程来处理任务(需要获得全局锁)
4、如果创建新线程将使当前运行的线程超出maxiumPoolSize(队列已满并且正在运行的线程数量大于或等于 maximumPoolSize),任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法(线程池会抛出异常,告诉调用者"我不能再接受任务了");
线程池采取上述的流程进行设计是为了减少获取全局锁的次数。在线程池完成预热(当前运行的线程数大于或等于corePoolSize)之后,几乎所有的excute方法调用都执行步骤2;
5、当一个线程完成任务时,它会从队列中取下一个任务来执行;
6、当一个线程无事可做(也就是在堵塞的workQueue哪里堵着),超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。(如果设置了allowCoreThreadTimeOut为true,就会停到0)
任务的管理是一件比较容易的事,复杂的是线程的管理,这会涉及线程数量、等待/唤醒、同步/锁、线程创建和死亡等问题。ThreadPoolExecutor与线程相关的几个成员变量是:keepAliveTime、allowCoreThreadTimeOut、poolSize、corePoolSize、maximumPoolSize,它们共同负责线程的创建和销毁。
三、在分析源码之前,还要看几个重要的常量还有一些和状态有关的常量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
ctl这个AtomicInteger的功能很强大,其高3位用于维护线程池运行状态,低29位维护线程池中线程数量。这个是很重要的一个变量!!
然后由状态常量可以看见,线程池一共有五个状态:
1、RUNNING:-1<<COUNT_BITS,即高3位为1,低29位为0,该状态的线程池会接收新任务,也会处理在阻塞队列中等待处理的任务
2、SHUTDOWN:0<<COUNT_BITS,即高3位为0,低29位为0,该状态的线程池不会再接收新任务,但还会处理已经提交到阻塞队列中等待处理的任务
3、STOP:1<<COUNT_BITS,即高3位为001,低29位为0,该状态的线程池不会再接收新任务,不会处理在阻塞队列中等待的任务,而且还会中断正在运行的任务
4、TIDYING:2<<COUNT_BITS,即高3位为010,低29位为0,所有任务都被终止了,workerCount为0,为此状态时还将调用terminated()方法
5、TERMINATED:3<<COUNT_BITS,即高3位为100,低29位为0,terminated()方法调用完成后变成此状态
这些状态均由int型表示,大小关系为 RUNNING<SHUTDOWN<STOP<TIDYING<TERMINATED,这个顺序基本上也是遵循线程池从 运行 到 终止这个过程。
3.1 下面的图体现了线程池的状态变化过程:
3.2 还有三个和状态相关的方法:
runStateOf(int c) 方法:c & 高3位为1,低29位为0的~CAPACITY,用于获取高3位保存的线程池状态
workerCountOf(int c)方法:c & 高3位为0,低29位为1的CAPACITY,用于获取低29位的线程数量
ctlOf(int rs, int wc)方法:参数rs表示runState,参数wc表示workerCount,即根据runState和workerCount打包合并成ctl
四、关键execute方法源码分析
这个可以说是线程池ThreadPoolExecutor的核心方法了,不管是线程池的submit还是Executor接口中的execute实现,都是调用这个execute方法。
现在就来分析这个方法,看源码和注解:
总得来说,可以大致分为三种情况,就像上面说的:1. 任务加进来时发现线程数小于corePoolSize;2. 任务加进来时发现线程数>corePoolSize但小于Max,就尝试把任务放进堵塞的workQueue,如果workQueue也满了的话,就新建线程来完成任务;任务加进来的时候,发现这个时候的线程数目大于maxPoolSize了,就交给拒绝机制去reject。
/** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * 在未来的某个时刻执行给定的任务。这个任务用一个新线程执行,或者用一个线程池中已经存在的线程执行 * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * 如果任务无法被提交执行,要么是因为这个Executor已经被shutdown关闭,要么是已经达到其容量上限,任务会被当前的RejectedExecutionHandler处理 * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution RejectedExecutionException是一个RuntimeException * @throws NullPointerException if {@code command} is null */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * 如果运行的线程少于corePoolSize,尝试开启一个新线程去运行command,command作为这个线程的第一个任务 * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * 如果任务成功放入队列,我们仍需要一个双重校验去确认是否应该新建一个线程(因为可能存在有些线程在我们上次检查后死了) 或者 从我们进入这个方法后,pool被关闭了 * 所以我们需要再次检查state,如果线程池停止了需要回滚入队列,如果池中没有线程了,新开启 一个线程 * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. * 如果无法将任务入队列(可能队列满了),需要新开区一个线程(自己:往maxPoolSize发展) * 如果失败了,说明线程池shutdown 或者 饱和了,所以我们拒绝任务 */ int c = ctl.get(); /** * 1、如果当前线程数少于corePoolSize(这里没有判断线程池的状态,可能是由于addWorker()操作已经包含对线程池状态的判断,如此处没加,而入workQueue前加了) */ if (workerCountOf(c) < corePoolSize) { //addWorker()成功,返回,这个addWorker就是添加一个工作线程的意思,因为worker是理解成工作的线程嘛,也就是在workerSet里面添加一个worker,具体的参数到介绍addWorker方法再讲吧。 if (addWorker(command, true)) return; /** * 没有成功addWorker(),再次获取c(凡是需要再次用ctl做判断时,都会再次调用ctl.get()) * 失败的原因可能是: * 1、线程池已经shutdown,shutdown的线程池不再接收新任务 * 2、workerCountOf(c) < corePoolSize 判断后,由于并发,别的线程先创建了worker线程,导致workerCount>=corePoolSize */ c = ctl.get(); } /** * 2、如果线程池RUNNING状态,且入队列成功 */ if (isRunning(c) && workQueue.offer(command)) { //如果到了这里的话,说明上面那if没进去或者是addWorker失败了,没进去说明线程数量大于corePoolSize了;失败了可能线程池状态shutDown,所以要判断线程池 //状态;失败了也可能是线程数量大于CorePoolSize了 //所以这里的if先判断线程池状态,再因为大于corePoolSize了,所以是上面说的情况2,要将新来的任务加入到堵塞队列workQueue中去。 int recheck = ctl.get();//再次校验位 /** * 再次校验放入workerQueue中的任务是否能被执行、 * 这里主要考虑到,把任务加进了这个堵塞队列之后,可能线程池状态又变了,比如关闭了,所以要看double check * 1、如果线程池不是运行状态了,这个时候会执行if里面的remove操作,其实就回滚刚刚的入队操作,从workQueue中删除任务,然后应该拒绝添加新任务, * 2、如果线程池是运行状态,那么remove不会执行到,reject也不会执行到,然后就去下一步只要保证workerSet也就是线程池中有一个线程就好了。 * 3、如果线程池不是运行状态,然后从workQueue中删除任务失败(刚好有一个线程执行完毕,并消耗了这个任务),那也是去下一步确保还有线程执行任务(只要有一个就够了) */ //如果再次校验过程中,线程池不是RUNNING状态,并且remove(command)--workQueue.remove()成功,拒绝当前command if (! isRunning(recheck) && remove(command)) reject(command); //如果当前worker数量为0,通过addWorker(null, false)创建一个线程,其任务为null //为什么只检查运行的worker数量是不是0呢?? 为什么不和corePoolSize比较呢?? //只保证有一个worker线程可以从queue中获取任务执行就行了?? //因为只要还有活动的worker线程,就可以消费workerQueue中的任务 else if (workerCountOf(recheck) == 0) addWorker(null, false); //第一个参数为null,说明只为新建一个worker线程,没有指定firstTask //第二个参数为true代表占用corePoolSize,false占用maxPoolSize } /** * 3、如果线程池不是running状态 或者 无法入队列 * 尝试开启新线程,扩容至maxPoolSize,如果addWork(command, false)失败了,拒绝当前command */ else if (!addWorker(command, false)) //这个else呢,代表着要么不是running状态,这个时候执行addWorker肯定是失败的,addWofker里面会判断线程池状态,所以拒绝//任务; //要么呢就是线程池是running状态,但加入任务的堵塞队列失败,说明堵塞队列也满了,也就是上面说的情况3,创建新工作worker来执行任务,如果这个失败,说明 //线程数目已经大于maxPoolSize了,就reject咯 reject(command); }