CompletableFuture是对Future的一种强有力的扩展,Future只能通过轮询isDone()方法或者调用get()阻塞等待获取一个异步任务的结果,才能继续执行下一步,当我们执行的异步任务很多,而且相互之前还要依赖结果的时候,可能会创建很多这样的Future,并通过get或者轮询等待执行结果返回之后继续执行,这样的代码显得很不方便而且也不高效。

通过前面的CompletionStage接口给我们提供了一系列将多个阶段(甚至是异步的)的结果相互关联执行的方法,如果把它和Future结合起来,那么可将这种便利与高效编程方式用于异步任务的执行。CompletableFuture就是这样的一个类,同时继承了CompletionStage和Future,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过lambda表达式的风格处理各个执行阶段的结果。

实现

CompletableFuture通过以下策略实现了接口CompletionStage:

  1. 依赖的非异步阶段提供的操作可以由完成当前CompletableFuture的线程执行,也可以由完成方法的任何其他调用者执行。
  2. 所有没有显式指定Executor参数的异步方法都使用ForkJoinPool.commonPool执行(除非它不支持至少两个并行级别,否则将创建一个新线程来运行每个任务).为了简化监视、调试和跟踪,所有生成的异步任务都是CompletableFuture.AsynchronousCompletionTask的实例。
  3. 所有实现CompletionStage的接口方法都是独立于其他公共方法实现的,因此一个方法的行为不会受到子类中其他方法重写的影响。

CompletableFuture通过以下策略实现了接口Future:

  1. CompletableFuture将取消看作是异常完成的另一种形式。cancel方法具有与completeExceptionally(new CancellationException())相同的效果。
  2. 在以CompletionException异常完成的情况下,get()和get(long, TimeUnit)方法抛出一个ExecutionException,其异常原因与对应的CompletionException中的原因相同。为了简化在大多数上下文中的使用,这个类还定义了join()和getNow方法,它们在这种情况下直接抛出CompletionException。

以下是CompletableFuture的内部实现概述:

由于CompletableFuture可以依赖其他一个甚至多个CompletableFuture,所以在内部实现的时候,每一个CompletableFuture都拥有一个依赖操作栈,栈中的元素是Completion的子类,它包含相关的操作、CompletableFuture以及源操作。当一个CompletableFuture完成之后会从栈中弹出并递归执行那些依赖它的CompletableFuture。由于依赖栈中的那些Completion元素也包含CompletableFuture对象,其CompletableFuture对象可能也拥有一个依赖栈,因此将形成一个非常复杂的依赖树。

CompletableFuture对每一种形式的实现使用了不同的Completion子类,例如:单输入(UniCompletion)、双输入(BiCompletion)、投影(使用BiCompletion两个输入中的任何一个(而不是两个)的双输入)、共享(CoCompletion,由两个源中的第二个使用)、零输入(不消费不产出的Runnable)操作和解除阻塞等待(get()、join()方法)的信号器Signallers。Completion类扩展了ForkJoinTask来启用异步执行(不增加空间开销,因为我们利用它的“标记”方法来维护声明).它还被声明为Runnable,可以被任意线程池调度执行。

CompletableFuture又在UniCompletion、BiCompletion、CoCompletion等这几种Completion子类的基础上扩展出了实现CompletionStage具体接口方法的前缀为"Uni", "Bi", "Or"的子类。例如实现单个输入、两个输入、两者之一的thenApply对应的就是UniApply、BiApply、OrApply。CompletableFuture在实现CompletionStage接口方法甚至自己独有的方法使都采用了相同的模式,以及调度策略,因此只要立即了一种方法的实现,其他方法都是类似的原理。

源码简述

runAsync/supplyAsync

虽然CompletableFuture提供了无参的构造方法,但我们一般从它的静态方法开始,根据是否有返回值,它对外提供了两种形式的执行异步任务的方法:

1 //执行无返回值的异步任务
2 public static CompletableFuture<Void> runAsync(Runnable runnable)
3 public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) 
4 
5 //执行有返回值的异步任务
6 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
7 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

它们都以async为后缀,根据CompletionStage的接口定义规律也可以知道是通过异步安排执行,又比如方法中带有run表示不消费也不产出型方法,再如,参数带有Executor的用自定义的线程池调度执行,否则使用默认的ForkJoinPool.commonPool执行。对于不支持并行运算的环境,例如单核CPU,CompletableFuture默认将采用一个任务创建一个Thread实例的方式执行。

我们以supplyAsync(Supplier<U> supplier)方法为例,继续向下分析:

 1 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
 2     return asyncSupplyStage(asyncPool, supplier);
 3 }
 4 static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
 5                                                  Supplier<U> f) {
 6     if (f == null) throw new NullPointerException();
 7     CompletableFuture<U> d = new CompletableFuture<U>(); //新创建一个CompletableFuture
 8     e.execute(new AsyncSupply<U>(d, f)); //安排异步执行
 9     return d; //立即返回
10 }
View Code

相关文章:

  • 2022-12-23
  • 2022-03-06
  • 2021-10-11
  • 2020-03-30
  • 2021-06-09
  • 2021-12-18
  • 2022-12-23
猜你喜欢
  • 2021-11-02
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2021-07-22
  • 2022-12-23
  • 2022-12-23
相关资源
相似解决方案