ScheduledThreadPoolExecutor,它是一个计划任务线程池,可以执行定时任务或者是计划任务。

  ScheduledThreadPoolExecutor 继承了ThreadPoolExecutor,需要了解 ThreadPoolExecutor 的原理,参考:【Java多线程】线程池ThreadPoolExecutor实现原理(二十二) 

  ScheduledThreadPoolExecutor 中的任务队列使用了 阻塞式延迟队列 (DelayedWorkQueue),参考:【Java多线程】DelayQueue源码分析 (二十六) 

  了解以上2个知识点之后,能更好的理解ScheduledThreadPoolExecutor的原理

  参考:【Java多线程】ScheduledThreadPoolExecutor详解(二十八) 

  本章是对其进行补充

一、ScheduledThreadPoolExecutor原理图

  【Java多线程】ScheduledThreadPoolExecutor实现原理(二十九)

  任务传递示意图

  【Java多线程】ScheduledThreadPoolExecutor实现原理(二十九)

二、属性

  由于 ScheduledThreadPoolExecutor 是继承了 ThreadPoolExecutor,ThreadPoolExecutor有的属性它都有,以下是另外增加的属性

 1 public class ScheduledThreadPoolExecutor
 2         extends ThreadPoolExecutor
 3         implements ScheduledExecutorService {
 4 
 5     // shutdown后继续存在周期任务
 6     private volatile boolean continueExistingPeriodicTasksAfterShutdown;
 7 
 8     // shutdown后执行存在延迟任务
 9     private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
10 
11     // 取消状态下移除标识
12     private volatile boolean removeOnCancel = false;
13 
14     // 序列器
15     private static final AtomicLong sequencer = new AtomicLong();
16 
17     ......
18 }

三、方法

  在ScheduledThreadPoolExecutor线程池提交任务执行,主要使用的ScheduledThreadPoolExecutor.java、ThreadPoolExecutor.java、ScheduledFutureTask.java、FutureTask.java、DelayedWorkQueue.java等相关类方法,如下:

1、构造方法

 1 // 根据核心线程数创建 ScheduledThreadPool 线程池
 2 public ScheduledThreadPoolExecutor(int corePoolSize) {
 3     super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
 4           new DelayedWorkQueue());
 5 }
 6 
 7 // 根据核心线程数、线程工厂创建 ScheduledThreadPool 线程池
 8 public ScheduledThreadPoolExecutor(int corePoolSize,
 9                                    ThreadFactory threadFactory) {
10     super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
11           new DelayedWorkQueue(), threadFactory);
12 }
13 
14 // 根据核心线程数、拒绝策略创建 ScheduledThreadPool
15 public ScheduledThreadPoolExecutor(int corePoolSize,
16                                    RejectedExecutionHandler handler) {
17     super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
18           new DelayedWorkQueue(), handler);
19 }
20 
21 // 根据核心线程数、线程工厂、拒绝策略创建 ScheduledThreadPool 线程池
22 public ScheduledThreadPoolExecutor(int corePoolSize,
23                                    ThreadFactory threadFactory,
24                                    RejectedExecutionHandler handler) {
25     super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
26           new DelayedWorkQueue(), threadFactory, handler);
27 }

2、schedule() 方法

 1 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
 2                                        long delay,
 3                                        TimeUnit unit) {
 4     if (callable == null || unit == null)
 5         throw new NullPointerException();
 6     // 新建一个 RunnableScheduledFuture 对象
 7     RunnableScheduledFuture<V> t = decorateTask(callable,
 8         new ScheduledFutureTask<V>(callable,
 9                                    triggerTime(delay, unit)));
10 
11     // 延迟执行任务
12     delayedExecute(t);
13     return t;
14 }

  由上可以看出,主要是新建了一个ScheduledFutureTask对象,然后点用延迟执行任务方法 delayedExecute(t)

3、decorateTask() 方法

1 // 包装任务成一个 RunnableScheduledFuture 对象
2 // 实际就是返回了task对象
3 protected <V> RunnableScheduledFuture<V> decorateTask(
4     Callable<V> callable, RunnableScheduledFuture<V> task) {
5     return task;
6 }

  返回任务对象

4、delayedExecute() 方法

 1 private void delayedExecute(RunnableScheduledFuture<?> task) {
 2     // 线程池状态是否 Shutdown
 3     if (isShutdown())
 4         // 拒绝任务
 5         reject(task);
 6     // 
 7     else {
 8         // 获取任务队列,添加任务
 9         super.getQueue().add(task);
10         // isShutdown() 再次判断任务状态
11         // canRunInCurrentRunState() 是否在当前状态下能运行
12         // remove() 移除任务
13         if (isShutdown() &&
14             !canRunInCurrentRunState(task.isPeriodic()) &&
15             remove(task))
16             // 任务取消
17             task.cancel(false);
18         else
19             // 确保线程池能启动任务
20             ensurePrestart();
21     }
22 }

  主要是将任务添加到队列中,然后执行ensurePrestart()确保线程池能启动任务

5、ThreadPoolExecutor 类中的 ensurePrestart() 方法

 1 void ensurePrestart() {
 2     // 获取工作线程数量
 3     int wc = workerCountOf(ctl.get());
 4     // 工作线程数小于 核心线程数
 5     if (wc < corePoolSize)
 6         // 添加工作线程线,以核心线程池数为上限
 7         addWorker(null, true);
 8     else if (wc == 0)
 9         // 添加工作线程线,以最大线程池线程数为上限
10         addWorker(null, false);
11 }

6、ThreadPoolExecutor 类中的 reject() 方法 拒绝任务

1 // 拒绝任务
2 final void reject(Runnable command) {
3     // 调用创建线程池时,使用的拒绝策略对象处理
4     handler.rejectedExecution(command, this);
5 }
View Code

相关文章:

  • 2021-07-30
  • 2021-12-29
  • 2022-01-03
  • 2022-01-06
  • 2021-08-12
  • 2021-07-14
  • 2022-02-10
  • 2021-08-10
猜你喜欢
  • 2021-09-08
  • 2022-02-06
  • 2021-12-22
  • 2022-01-25
  • 2021-09-11
  • 2021-07-25
相关资源
相似解决方案