以前线程Thread既表示执行的任务,又表示执行的机制。在JDK1.5中,java并发框架提供了一种“执行服务”的相关API,它将"任务的执行"和"任务的提交“相分离,”执行服务“封装了任务执行的细节,对于任务提交者来说,它可进一步聚焦于任务本身,如任务提交、获取任务执行后的结果、取消任务而不需要关注任务执行的细节,如线程的创建、任务的调试、线程的复用或关闭等。
任务执行服务主要涉及4个接口
-
Runnable和Callable: 表示要执行的异步任务
-
Executor和ExecutorService:表示执行任务的服务
-
Future : 表示任务执行后的结果
Runnable是我们最常用的异步任务接口,这个接口的方法没有返回值,不能抛出异常。而Callable接口就是为了解决Runnable的不足而在JDK1.5引入的接口,此接口的方法有返回值,且可抛出异常。
@FunctionalInterface public interface Callable<V> { V call() throws Exception; }
1)Executor接口
Executor接口也只有一个方法,这个方法接受一个Runnable类型的参数,这是个抽象方法,它无法指定任务该如何执行。它可能是新建一个线程执行任务,也可能是利用线程池中的一个线程,还可能是在调用者线程中执行。
public interface Executor { void execute(Runnable command); }
ExecutorService扩展了Executor接口,它添加了一些新功能,如支持有返回结果的任务、支持超时任务、支持取消任务、支持批量提交任务。这里的submit方法,返回类型是Future,返回后,只表示任务已提交,不代表已经执行,具体执行与否要看”执行服务“如何调度,通过Future可以查询异步任务的状态、获取最终的结果、取消任务等。
public interface ExecutorService extends Executor { void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
2)Future接口
Future接口是对任务的结果做了进一步封装,字面上”future"是未来的意思,这里确实是表示“未来(或最终)的结果”,“结果”需要等待。
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
cancel()方法用于取消异步任务,若任务已完成、已取消或因其他原因不能取消等原因而导致任务取消失败就会返回false,反之返回true. 如果任务还未开始,则不再运行,但若任务已经在运行,则不一定能取消这个任务。参数mayInterruptIfRunning表示,如果任务正在执行,是否调用Thread.interrupt()方法中断线程,而interrupt()方法只是设置线程中断标志,它不一定能真的中断线程。
isCancelled() 和isDone()分别返回任务是否被取消、任务是否已完成的布尔值。只要cancel方法返回true,那么即使执行任务的线程还未结束,isCancelled方法也一定会返回true。不管什么原因,无论是任务正常结束、任务抛出异常或任务被取消,只要任务结束了,isDone都会返回true.
get()方法用于返回异步任务的结果,若任务未完成,当前线程则会阻塞待。get(long,TimeUnit)方法需要设定等待时长,若在给定的时间内还未完成任务,则会抛出TimeoutException异常。
get方法的最终结果大致有3种: ①任务正常完成,get方法返回任务的执行结果,若任务是Runnable且入参未提供结果,最终返回null ②任务被取消了,get方法会抛出CancellationException. ③任务执行过程中抛出了异常,get方法会将异常包装为ExecutionException重新抛出,此异常的getCause方法可获得原始异常。
Future是实现”任务的提交“与”任务的执行“相分离的关键,它是两者的桥梁,它使任务的提交者和任务的执行器的关注点相隔离,同时又让两者彼此联系。
2.用法示例
下面的例子中,使用异步任务,计算0-300的累加结果,在计算出结果前Future的get()方法将阻塞等待。
这里使用了Executors工具类的newSingleThreadeExecutor方法创建一了个执行服务。Executors有很多静态方法,分别创建各种线程池执行服务。
class SimpleTaskTest { static class Accumlation implements Callable<Integer> { @Override public Integer call() throws Exception { int sum =0; for (int i = 0; i < 300; i++) { sum += i; Thread.sleep(10); } return sum; } } public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executor = Executors.newSingleThreadExecutor(); Future<Integer> future = executor.submit(new Accumlation()); long start = System.currentTimeMillis(); int result = future.get(); System.out.println("Future.get方法等待了" + (System.currentTimeMillis() - start) + "毫秒才返回,计算结果:" + result); } }
打印的结果
Future.get方法等待了3147毫秒才返回,计算结果:44850
3.基本原理
1)ExecutorService实现
ExecutorService是Executor的子接口,它添加了一些新功能,如支持有返回结果的任务、支持超时任务、支持取消任务、支持批量提交任务。
public interface ExecutorService extends Executor { void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
①ExecutorService有两个关闭执行服务的方法,分别是shutdown()和shutdownNow. 两者关闭执行服务的方式有所差别,shutdown()方法调用后不会接受新的任务,但已提交的任务将继续执行(即使任务还未真正开始执行);而shutdownNow()方法不仅不会接受新任务,而且还会终止已经提交但未执行的任务,对于正在执行的任务,一般调用Thread.interrupt()方法设置中断标志,不过线程可能不响应中断,shutdownNow会返回已提交但未执行的任务列表。shutdown和shutdownNow不会阻塞等待,它们返回后不代表所有任务都结束。
②isShutdown返回执行服务是否被关闭的布尔值(不会等待),只要shutdown或shutdownNow任意一方法被调用后,isShutdwon都将返回true.
③awaitTermination方法用于等待执行服务中的所有任务完成,此方法需要设置超时时间,如果在限时间内所有任务都结束了(允许非正常结束),
④isTerminated 返回在执行服务关闭后所有任务是否已完成的布尔值,如果在此之前shutdownNow或shutdown没有被调用,这里永不可能返回true.
⑤三个submit方法都用于提交单任务,submit(Callable<T> )方法中入参Callable本身有返回结果 ;submit(Runnable,T)方法在设定任务的同时可以提供一个结果,在任务结束时将返回这个结果;submit(Runnable )方法入参没有提供结果,最终返回的结果是null 。
⑥ExecutorService有两类批量提交任务的方法,invokeAll和invokeAny,它们都有两个版本,一个不限时版本、一个超时版本。
invokeAll等待(给定的)所有任务完成,返回的Future集合中,每个Future的isDone方法都返回true,但isDone是true并不代表任务完成了,也有可能是因任务被取消而导致任务非正常结束。invokeAll的超时版本方法,需要指定等待时间的时间,若超时后还有任务还未完成,这些任务就会被取消。而invokeAny,只要有一个任务正常完成(没抛出异常)后,它就返回此任务的结果;在正常返回或异常抛出返回后,其他任务则会被取消。对于invokeAny的超时版本,如果在限时内有一任务正常(没抛出异常)完成,就返回此任务的结果 ,其他将任务会被取消;如果没有任务能在限时内成功完成返回,就抛出TimeoutException; 没有任务正常成功返回(可能是因发生某种异常而返回),将抛出ExecutionException.
在了解ExecutorService接口的相关抽象方法定义后,我们来进一步分析它的实现类和实现原理。
ExecutorService的主要实现类是ThreadPoolExecutor,它是基于线程池实现的,ExecutorService有一个很重要的抽象类AbstractExecutorService, 而且ThreadPoolExecutor就是直接继承于AbstractExecutorService。
我们可以基于此抽象类实现一个简易的ExecutorService。AbstractExecutorService提供了submit 、invokeAll和invokeAny的默认实现,子类只需要实现其他方法就行了。shutdown与isShutdown 等方法与生命周期管理有关,我暂时可以不用去管它,其实它的子类最关键在于实现execute方法,因为submit、invokeAll、invokeAny等方法底层主要还是调用execute方法。
import java.util.List; import java.util.concurrent.*; public class CustomizeExecutorService extends AbstractExecutorService { @Override public void shutdown() { System.out.println("=====shutdown====="); } @Override public List<Runnable> shutdownNow() { System.out.println("=====shutdownNow====="); return null; } @Override public boolean isShutdown() { return false; } @Override public boolean isTerminated() { return false; } @Override public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return false; } @Override public void execute(Runnable command) { new Thread(command).start(); } } class Accumlation implements Callable<Integer> { @Override public Integer call() throws Exception { int sum = 0; for (int i = 0; i < 300; i++) { sum += i; Thread.sleep(10); } return sum; } public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executor = new CustomizeExecutorService(); Future<Integer> future = executor.submit(new Accumlation()); long start = System.currentTimeMillis(); int result = future.get(); System.out.println("Future.get方法等待了" + (System.currentTimeMillis() - start) + "毫秒,计算结果:" + result); } }