zyxs

这两天学的线程池归纳

1 线程池 ---》接口 Executor

2 线程池 ---》接口 ExecutorService

final ExecutorService poolExecutor = Executors.newFixedThreadPool(threadNum);

poolExecutor.submit(new ConvertOne(latch,seperateMap,dbname));

poolExecutor.shutdown();
shutdown()方法在终止前允许执行以前提交的任务,而 shutdownNow() 方法阻止等待任务的启动并试图停止当前正在执行的任务。在终止后,执行程序没有任务在执行,也没有任务在等待执行,并且无法提交新任务。应该关闭未使用的 ExecutorService以允许回收其资源。
例子:网络服务的简单结构


3 线程池 ---》类 ThreadPoolExecutor
由于内容较多没有一一研究,只看了较常用的 ThreadPoolExecutor ,所以在这里做个介绍。ThreadPoolExecutor 的继承关系如下。
Executor->ExecutorService->AbstractExecutorService->ThreadPoolExecutor
已实现的接口: Executor, ExecutorService

每个 ThreadPoolExecutor 还维护着一些基本的统计数据,如完成的任务数。为了便于跨大量上下文使用,此类提供了很多可调整的参数和扩展钩子 (hook)。但是,强烈建议程序员使用较为方便的 Executors 工厂方法 Executors.newCachedThreadPool()(*线程池,可以进行自动线程回收)、Executors.newFixedThreadPool(int)(固定大小线程池)和 Executors.newSingleThreadExecutor()(单个后台线程)。

关于ThreadPoolExecutor的 execute方法:
public void execute(Runnable command)
这里给出源码:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl是一个重要的变量,主要包装两个重要的概念:一是workerCount:effective number of threads,二是runState:  indicating whether running, shutting down 

RUNNING可以接受新的task,并且可以处理queue中的task,SHUTDOWN不可以接受新的task,但是可以处理queue中的task,其他的全都不可以。

int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { 判断线程数是否小于corePoolSize。否则进入②
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { //② 判断线程是否是running状态,并且阻塞队列的容量没有达到上限
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);//如果此处状态不是RUNNING,也不是SHUTDOWN,或者removeFirstOccurrence(队列中移除第一个阻塞的任务,返回ture)那么,则拒绝任务
else if (workerCountOf(recheck) == 0)
addWorker(null, false);//由于任务放到了BlockingQueue中,此处,在Worker中,不添加task,而是运行任务时,从queue取出task
}
else if (!addWorker(command, false))//除了以上情况以外,比如BlockingQueue饱和了,线程池容量也饱和了,执行饱和策略,默认为AbortPolicy,拒绝任务
reject(command);
}

  

常用两个阶段来关闭ExecutorServise
( 1 shutdown 拒绝传入任务 ,当所有已提交任务执行完后,就关闭。如果已经关闭,则调用没有其他作用。 抛出:
        SecurityException - 如果安全管理器存在并且关闭,此 ExecutorService 可能操作某些不允许调用者修改的线程。
(2 在!pool.awaitTermination(60, TimeUnit.SECONDS) 任务还没有执行完,取消所有遗留的任务 shutdownNow

关于构造的参数 corePoolSize ,maximumPoolSize的说明
注意1:在新任务被提交时,如果运行的core线程少于corePoolSize,才创建新core线程。并不是一开始就创建corePoolSize个core线程。
注意2:"如果运行的线程多于corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程"
注意3:如果两个相同则固定大小的线程池(这时候可以用blockingQueue 进行排队,用来传输和保持提交的任务,使用队列与线程池的大小进行交互)
注意4: maximumPoolSize设置成 Integer.MAX_VALUE,则允许使用任何数量的并发任务。*
注意5:核心线程即core线程,只有当前线程数小于等于corePoolSize时,这时的线程才叫核心线程。

构造函数:

(1)public ThreadPoolExecutor(int corePoolSize, //池中所保存的线程数,包括空闲线程。
                          int maximumPoolSize,//池中允许的最大线程数。
                          long keepAliveTime,//当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
                          TimeUnit unit,//参数的时间单位。
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory)

使用ThreadFactory创建新线程,如果没有另外说明,则使用 Executors.defaultThreadFactory() 创建线程,他们在同一个ThreadGroup中, 并且这些线程具有相同的 NORM_PRIORITY 优先级和非守护进程状态。

(2)public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue)

使用workQueue - 执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。 
 线程池所使用的缓冲队列,改缓冲队列的长度决定了能够缓冲的最大数量。
拒绝任务:拒绝任务是指当线程池里面的线程数量达到 maximumPoolSize 且 workQueue 队列已满的情况下被尝试添加进来的任务。

(3)public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler)
使用handler - 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。 

关于线程处于非活动状态如何减少资源消耗:【空闲线程的回收】
注意1:setKeepAliveTime(long, java.util.concurrent.TimeUnit)用于设置空闲线程最长的活动时间,
    即如果空闲时间超过设定值,就停掉该线程,对该线程进行回收。
    该策略默认只对非内核线程有用(即当前线程数大于corePoolSize),
    可以调用allowCoreThreadTimeOut(boolean)方法将此超时策略扩大到核心线程。 设置适当保持活动时间,使用0核心线程的下边界和/或设置 allowCoreThreadTimeOut(boolean)。
注意2:如果把值设为Long.MAX_VALUE TimeUnit.NANOSECONDS的话,空闲线程不会被回收直到ThreadPoolExecutor为Terminate。

关于blockingQueue 进行排队与线程池大小进行交互:

为什么要与线程池大小进行交互,用于传输和保持提交的任务,
(1、 corePoolSize, maximumPoolSize的设置
    * 如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。
         * 如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。
         * 如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。
(2、排队的三种通用策略
  直接提交【 默认选项 synchronousQueue 】此策略允许线程*的增长。
任务直接提交给线程,而不保持它们。
如果不存在立即运行任务的线程,会构造新的线程,通常需要设置maximumPoolSize*,否则无法创建新线程,导致拒绝新任务的提交。
  *队列【例如,没有预定容量的LinkedBlockingQueue】此策略适合web服务器类型,每个任务独立于其他任务。任务互不影响。允许队列无线的增长,适合处理瞬态突发请求。
创建的线程永远不会超过corePoolSize 但是maximumPoolSize就失效了。
  有界队列【例如,ArrayBlocking】
 队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。
 使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。Queue】此策略有助于防止资源耗尽,但是可能较难调整和控制。

Java 线程池 ThreadPoolExecutor.的拒绝策略

在 ThreadPoolExecutor 里面定义了 4 种 handler 策略,分别是
(1)CallerRunsPolicy :这个策略重试添加当前的任务,他会自动重复调用 execute() 方法,直到成功。只要线程池未关闭,该策略直接在调用者线程中运行当前被丢弃的任务。显然这样不会真的丢弃任务,但是,调用者线程性能可能急剧下降。
(2)AbortPolicy :对拒绝任务抛弃处理,并且抛出异常。(默认的)RejectedExecutionException
(3)DiscardPolicy :对拒绝任务直接无声抛弃,没有异常信息。
(4)DiscardOldestPolicy :对拒绝任务不抛弃,而是抛弃队列里面等待最久的一个线程,然后把拒绝任务加到队列。
关于这个给出源码:

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}

扩展RejectedExecutioHandler接口,自定义拒绝策略

先看下RejectedExecutionHandler接口吧:
public interfaceRejectedExecutionHandler{
    voidrejectedExecution(Runnable r,ThreadPoolExecutor executor);
}
再举个例子吧:

 1 public classTestRejectHandler {
 2     class MyTask implements Runnable{
 3        @Override
 4        public void run() {
 5            System.out.println("Thread ID:"+Thread.currentThread().getId());
 6            try{
 7               Thread.sleep(1000);
 8            }catch(InterruptedException e){
 9               e.printStackTrace();
10            }
11        }
12     }
13     public void test(){
14        ThreadPoolExecutor executor= newThreadPoolExecutor(5, 5,
15               0L, TimeUnit.MILLISECONDS,
16               newLinkedBlockingQueue(10),
17               Executors.defaultThreadFactory(),
18               newRejectedExecutionHandler() {  
19                   @Override
20                   public voidrejectedExecution(Runnable r, ThreadPoolExecutor executor) {
21                      System.out.println(r.toString()+" 被抛弃了");
22                   }
23               });
24        MyTask task= newMyTask();
25        for(int i=0;i<20;i++){
26            executor.submit(task);
27        }
28        executor.shutdown();
29     }


最后给出一个简单的利用线程池的例子

public class ThreadPoolTermination {
	public static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 10, 10L, TimeUnit.SECONDS,
			new LinkedBlockingQueue<Runnable>(10), new SolrjNamedThreadFactory("Termination"),
			new ThreadPoolExecutor.CallerRunsPolicy());
	//SolrjNamedThreadFactory监控线程
	//CallerRunsPolicy 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序

	static class TestThread implements Runnable {
		int i;
		public TestThread(int i) {
			this.i = i;
		}
		@Override
		public void run() {
			try {
				String thrdName = Thread.currentThread().getName();
				//dosomethind-------------这里写线程需要做的事
				System.out.println("当前线程:"+thrdName+"   "+i);
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}

	public static void main(String[] args) {
		for (int i = 0; i < 200; i++) {
			threadPool.execute(new TestThread(i));
		}
		/*下列方法分两个阶段关闭 ExecutorService。第一阶段调用 shutdown 拒绝传入任务,然后等10秒后,
		任务还没执行完成,就调用 shutdownNow(如有必要)取消所有遗留的任务*/
		threadPool.shutdown();
		try {
			while (!threadPool.awaitTermination(10, TimeUnit.MILLISECONDS)) {
				System.out.println("waiting for thread pool to terminate...");
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
			threadPool.shutdownNow();
		}
		System.out.println("done");
	}
}

  总结一下:

一个任务通过 execute(Runnable) 方法被添加到线程池,任务就是一个 Runnable 类型的对象,任务的执行方法就是 Runnable 类型对象的 run() 方法。

当一个任务通过 execute(Runnable) 方法欲添加到线程池时,线程池采用的策略如下:

1. 如果此时线程池中的数量小于 corePoolSize ,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。

2. 如果此时线程池中的数量等于 corePoolSize ,但是缓冲队列 workQueue 未满,那么任务被放入缓冲队列。

3. 如果此时线程池中的数量大于 corePoolSize ,缓冲队列 workQueue 满,并且线程池中的数量小于maximumPoolSize ,建新的线程来处理被添加的任务。

4. 如果此时线程池中的数量大于 corePoolSize ,缓冲队列 workQueue 满,并且线程池中的数量等于maximumPoolSize ,那么通过 handler 所指定的策略来处理此任务。

处理任务的优先级为:

核心线程 corePoolSize 、任务队列 workQueue 、最大线程 maximumPoolSize ,如果三者都满了,使用 handler处理被拒绝的任务。当线程池中的线程数量大于 corePoolSize 时,如果某线程空闲时间超过 keepAliveTime ,线程将被终止。这样,线程池可以动态的调整池中的线程数。

posted on 2018-04-13 14:55 张先生~ 阅读(...) 评论(...) 编辑 收藏

相关文章: