并发编程中,我们通常会用到一组非阻塞的模型:Future 和 Callback、Promise。其中的 Future 表示一个可能还没有实际完成的异步任务的结果,针对这个结果可以添加 Callback 以便在任务执行成功或失败后做出对应的操作,而Promise交由任务执行者,任务执行者通过 Promise 可以标记任务完成或者失败。 这一套模型是很多异步非阻塞架构的基础。
JDK中实现了Future和CallBack的模式,并没有Promise以及实现。Netty有类似“调用者和执行部件以异步的方式交互通信结果”的需求(要知道eventloop本质上是一个ScheduledExecutorService,ExecutorService是一种“提交-执行”模型实现,也存在线程间异步方式通信和线程安全问题),所以Netty实现了一套完整的Futre 和 Callback、Promise架构。
关于Future以及FutureTask的原理请见并发编程之Future源码解析
1. Future&Promise
也许你已经使用过JDK的Future对象,该接口的方法如下:
// 取消异步操作
boolean cancel(boolean mayInterruptIfRunning);
// 异步操作是否取消
boolean isCancelled();
// 异步操作是否完成,正常终止、异常、取消都是完成
boolean isDone();
// 阻塞直到取得异步操作结果
V get() throws InterruptedException, ExecutionException;
// 同上,但最长阻塞时间为timeout
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
接口中只有isDone()方法判断一个异步操作是否完成,但是对于完成的定义过于模糊,JDK文档指出如果任务正在执行过程中返回false,正常终止、抛出异常、用户取消都会使isDone()方法返回true,其实这是JDK接口的约定。在我们的使用中,我们极有可能是对这三种情况分别处理,而JDK这样的设计不能满足我们的需求。
对于一个异步操作,我们更关心的是这个异步操作触发或者结束后能否再执行一系列动作。Netty扩展了JDK的Future接口,使其能解决上面的问题。扩展的方法如下:
// 异步操作完成且正常终止
boolean isSuccess();
// 异步操作是否可以取消
boolean isCancellable();
// 异步操作失败的原因
Throwable cause();
// 添加一个监听者,异步操作完成时回调,类比javascript的回调函数
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
// 阻塞直到异步操作完成
Future<V> await() throws InterruptedException;
// 同上,但异步操作失败时抛出异常
Future<V> sync() throws InterruptedException;
// 非阻塞地返回异步结果,如果尚未完成返回null
V getNow();
Netty中future的特点:
- 操作状态分为success,fail,canceled三种。
- 并且通过addlisteners()方法可以添加回调操作,即异常、取消触发或者完成时需要进行的操作,类似于js中的callBack()。
- await()和sync(),可以以阻塞的方式等待异步完成;getnow()可以获得异步操作的结果,如果还未完成则返回Null。
future只有两种执行结果状态,unconpleted和conpleted,每种状态对应方法的返回值如下图
* +---------------------------+
* | Completed successfully |
* +---------------------------+
* +----> isDone() = true |
* +--------------------------+ | | isSuccess() = true |
* | Uncompleted | | +===========================+
* +--------------------------+ | | Completed with failure |
* | isDone() = false | | +---------------------------+
* | isSuccess() = false |----+----> isDone() = true |
* | isCancelled() = false | | | cause() = non-null |
* | cause() = null | | +===========================+
* +--------------------------+ | | Completed by cancellation |
* | +---------------------------+
* +----> isDone() = true |
* | isCancelled() = true |
* +---------------------------+
Future接口中的方法都是getter方法而没有setter方法,也就是说这样实现的Future子类的状态是不可变的,如果我们想要变化,那该怎么办呢?Netty提供的解决方法是:使用可写的Future即Promise。Promise接口扩展的方法如下:
// 标记异步操作结果为成功,如果已被设置(不管成功还是失败)则抛出异常IllegalStateException
Promise<V> setSuccess(V result);
// 同上,只是结果已被设置时返回False
boolean trySuccess(V result);
Promise<V> setFailure(Throwable cause);
boolean tryFailure(Throwable cause);
// 设置结果为不可取消,结果已被取消返回False
boolean setUncancellable();
需要注意的是:Promise接口继承自Future接口,它提供的setter方法与常见的setter方法大为不同。Promise从Uncompleted–>Completed的状态转变有且只能有一次,也就是说setSuccess和setFailure方法最多只会成功一个,此外,在setSuccess和setFailure方法中会通知注册到其上的监听者。
至此,我们从总体上了解了Future和Promise的原理。我们再看一下类图: