CompletableFuture是对Future的一种强有力的扩展,Future只能通过轮询isDone()方法或者调用get()阻塞等待获取一个异步任务的结果,才能继续执行下一步,当我们执行的异步任务很多,而且相互之前还要依赖结果的时候,可能会创建很多这样的Future,并通过get或者轮询等待执行结果返回之后继续执行,这样的代码显得很不方便而且也不高效。
通过前面的CompletionStage接口给我们提供了一系列将多个阶段(甚至是异步的)的结果相互关联执行的方法,如果把它和Future结合起来,那么可将这种便利与高效编程方式用于异步任务的执行。CompletableFuture就是这样的一个类,同时继承了CompletionStage和Future,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过lambda表达式的风格处理各个执行阶段的结果。
实现
CompletableFuture通过以下策略实现了接口CompletionStage:
- 依赖的非异步阶段提供的操作可以由完成当前CompletableFuture的线程执行,也可以由完成方法的任何其他调用者执行。
- 所有没有显式指定Executor参数的异步方法都使用ForkJoinPool.commonPool执行(除非它不支持至少两个并行级别,否则将创建一个新线程来运行每个任务).为了简化监视、调试和跟踪,所有生成的异步任务都是CompletableFuture.AsynchronousCompletionTask的实例。
- 所有实现CompletionStage的接口方法都是独立于其他公共方法实现的,因此一个方法的行为不会受到子类中其他方法重写的影响。
CompletableFuture通过以下策略实现了接口Future:
- CompletableFuture将取消看作是异常完成的另一种形式。cancel方法具有与completeExceptionally(new CancellationException())相同的效果。
- 在以CompletionException异常完成的情况下,get()和get(long, TimeUnit)方法抛出一个ExecutionException,其异常原因与对应的CompletionException中的原因相同。为了简化在大多数上下文中的使用,这个类还定义了join()和getNow方法,它们在这种情况下直接抛出CompletionException。
以下是CompletableFuture的内部实现概述:
由于CompletableFuture可以依赖其他一个甚至多个CompletableFuture,所以在内部实现的时候,每一个CompletableFuture都拥有一个依赖操作栈,栈中的元素是Completion的子类,它包含相关的操作、CompletableFuture以及源操作。当一个CompletableFuture完成之后会从栈中弹出并递归执行那些依赖它的CompletableFuture。由于依赖栈中的那些Completion元素也包含CompletableFuture对象,其CompletableFuture对象可能也拥有一个依赖栈,因此将形成一个非常复杂的依赖树。
CompletableFuture对每一种形式的实现使用了不同的Completion子类,例如:单输入(UniCompletion)、双输入(BiCompletion)、投影(使用BiCompletion两个输入中的任何一个(而不是两个)的双输入)、共享(CoCompletion,由两个源中的第二个使用)、零输入(不消费不产出的Runnable)操作和解除阻塞等待(get()、join()方法)的信号器Signallers。Completion类扩展了ForkJoinTask来启用异步执行(不增加空间开销,因为我们利用它的“标记”方法来维护声明).它还被声明为Runnable,可以被任意线程池调度执行。
CompletableFuture又在UniCompletion、BiCompletion、CoCompletion等这几种Completion子类的基础上扩展出了实现CompletionStage具体接口方法的前缀为"Uni", "Bi", "Or"的子类。例如实现单个输入、两个输入、两者之一的thenApply对应的就是UniApply、BiApply、OrApply。CompletableFuture在实现CompletionStage接口方法甚至自己独有的方法使都采用了相同的模式,以及调度策略,因此只要立即了一种方法的实现,其他方法都是类似的原理。
源码简述
runAsync/supplyAsync
虽然CompletableFuture提供了无参的构造方法,但我们一般从它的静态方法开始,根据是否有返回值,它对外提供了两种形式的执行异步任务的方法:
1 //执行无返回值的异步任务 2 public static CompletableFuture<Void> runAsync(Runnable runnable) 3 public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) 4 5 //执行有返回值的异步任务 6 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) 7 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
它们都以async为后缀,根据CompletionStage的接口定义规律也可以知道是通过异步安排执行,又比如方法中带有run表示不消费也不产出型方法,再如,参数带有Executor的用自定义的线程池调度执行,否则使用默认的ForkJoinPool.commonPool执行。对于不支持并行运算的环境,例如单核CPU,CompletableFuture默认将采用一个任务创建一个Thread实例的方式执行。
我们以supplyAsync(Supplier<U> supplier)方法为例,继续向下分析:
1 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { 2 return asyncSupplyStage(asyncPool, supplier); 3 } 4 static <U> CompletableFuture<U> asyncSupplyStage(Executor e, 5 Supplier<U> f) { 6 if (f == null) throw new NullPointerException(); 7 CompletableFuture<U> d = new CompletableFuture<U>(); //新创建一个CompletableFuture 8 e.execute(new AsyncSupply<U>(d, f)); //安排异步执行 9 return d; //立即返回 10 }