ScheduledThreadPoolExecutor,它是一个计划任务线程池,可以执行定时任务或者是计划任务。
ScheduledThreadPoolExecutor 继承了ThreadPoolExecutor,需要了解 ThreadPoolExecutor 的原理,参考:【Java多线程】线程池ThreadPoolExecutor实现原理(二十二)
ScheduledThreadPoolExecutor 中的任务队列使用了 阻塞式延迟队列 (DelayedWorkQueue),参考:【Java多线程】DelayQueue源码分析 (二十六)
了解以上2个知识点之后,能更好的理解ScheduledThreadPoolExecutor的原理
参考:【Java多线程】ScheduledThreadPoolExecutor详解(二十八)
本章是对其进行补充
一、ScheduledThreadPoolExecutor原理图
任务传递示意图
二、属性
由于 ScheduledThreadPoolExecutor 是继承了 ThreadPoolExecutor,ThreadPoolExecutor有的属性它都有,以下是另外增加的属性
1 public class ScheduledThreadPoolExecutor 2 extends ThreadPoolExecutor 3 implements ScheduledExecutorService { 4 5 // shutdown后继续存在周期任务 6 private volatile boolean continueExistingPeriodicTasksAfterShutdown; 7 8 // shutdown后执行存在延迟任务 9 private volatile boolean executeExistingDelayedTasksAfterShutdown = true; 10 11 // 取消状态下移除标识 12 private volatile boolean removeOnCancel = false; 13 14 // 序列器 15 private static final AtomicLong sequencer = new AtomicLong(); 16 17 ...... 18 }
三、方法
在ScheduledThreadPoolExecutor线程池提交任务执行,主要使用的ScheduledThreadPoolExecutor.java、ThreadPoolExecutor.java、ScheduledFutureTask.java、FutureTask.java、DelayedWorkQueue.java等相关类方法,如下:
1、构造方法
1 // 根据核心线程数创建 ScheduledThreadPool 线程池 2 public ScheduledThreadPoolExecutor(int corePoolSize) { 3 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, 4 new DelayedWorkQueue()); 5 } 6 7 // 根据核心线程数、线程工厂创建 ScheduledThreadPool 线程池 8 public ScheduledThreadPoolExecutor(int corePoolSize, 9 ThreadFactory threadFactory) { 10 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, 11 new DelayedWorkQueue(), threadFactory); 12 } 13 14 // 根据核心线程数、拒绝策略创建 ScheduledThreadPool 15 public ScheduledThreadPoolExecutor(int corePoolSize, 16 RejectedExecutionHandler handler) { 17 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, 18 new DelayedWorkQueue(), handler); 19 } 20 21 // 根据核心线程数、线程工厂、拒绝策略创建 ScheduledThreadPool 线程池 22 public ScheduledThreadPoolExecutor(int corePoolSize, 23 ThreadFactory threadFactory, 24 RejectedExecutionHandler handler) { 25 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, 26 new DelayedWorkQueue(), threadFactory, handler); 27 }
2、schedule() 方法
1 public <V> ScheduledFuture<V> schedule(Callable<V> callable, 2 long delay, 3 TimeUnit unit) { 4 if (callable == null || unit == null) 5 throw new NullPointerException(); 6 // 新建一个 RunnableScheduledFuture 对象 7 RunnableScheduledFuture<V> t = decorateTask(callable, 8 new ScheduledFutureTask<V>(callable, 9 triggerTime(delay, unit))); 10 11 // 延迟执行任务 12 delayedExecute(t); 13 return t; 14 }
由上可以看出,主要是新建了一个ScheduledFutureTask对象,然后点用延迟执行任务方法 delayedExecute(t)
3、decorateTask() 方法
1 // 包装任务成一个 RunnableScheduledFuture 对象 2 // 实际就是返回了task对象 3 protected <V> RunnableScheduledFuture<V> decorateTask( 4 Callable<V> callable, RunnableScheduledFuture<V> task) { 5 return task; 6 }
返回任务对象
4、delayedExecute() 方法
1 private void delayedExecute(RunnableScheduledFuture<?> task) { 2 // 线程池状态是否 Shutdown 3 if (isShutdown()) 4 // 拒绝任务 5 reject(task); 6 // 7 else { 8 // 获取任务队列,添加任务 9 super.getQueue().add(task); 10 // isShutdown() 再次判断任务状态 11 // canRunInCurrentRunState() 是否在当前状态下能运行 12 // remove() 移除任务 13 if (isShutdown() && 14 !canRunInCurrentRunState(task.isPeriodic()) && 15 remove(task)) 16 // 任务取消 17 task.cancel(false); 18 else 19 // 确保线程池能启动任务 20 ensurePrestart(); 21 } 22 }
主要是将任务添加到队列中,然后执行ensurePrestart()确保线程池能启动任务
5、ThreadPoolExecutor 类中的 ensurePrestart() 方法
1 void ensurePrestart() { 2 // 获取工作线程数量 3 int wc = workerCountOf(ctl.get()); 4 // 工作线程数小于 核心线程数 5 if (wc < corePoolSize) 6 // 添加工作线程线,以核心线程池数为上限 7 addWorker(null, true); 8 else if (wc == 0) 9 // 添加工作线程线,以最大线程池线程数为上限 10 addWorker(null, false); 11 }
6、ThreadPoolExecutor 类中的 reject() 方法 拒绝任务
1 // 拒绝任务 2 final void reject(Runnable command) { 3 // 调用创建线程池时,使用的拒绝策略对象处理 4 handler.rejectedExecution(command, this); 5 }