期货
Futures 是在 Java 5 (2004) 中引入的。它们基本上是尚未完成的操作结果的占位符。操作完成后,Future 将包含该结果。例如,操作可以是提交给ExecutorService 的Runnable 或Callable 实例。操作的提交者可以使用Future对象来检查操作是否isDone(),或者使用阻塞的get()方法等待它完成。
例子:
/**
* A task that sleeps for a second, then returns 1
**/
public static class MyCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
Thread.sleep(1000);
return 1;
}
}
public static void main(String[] args) throws Exception{
ExecutorService exec = Executors.newSingleThreadExecutor();
Future<Integer> f = exec.submit(new MyCallable());
System.out.println(f.isDone()); //False
System.out.println(f.get()); //Waits until the task is done, then prints 1
}
CompletableFutures
CompletableFutures 是在 Java 8 (2014) 中引入的。它们实际上是常规 Futures 的演变,灵感来自 Google 的 Listenable Futures,它是 Guava 库的一部分。它们是期货,还允许您将任务串联在一起。您可以使用它们告诉某个工作线程“去做一些任务 X,完成后,使用 X 的结果去做其他事情”。使用 CompletableFutures,您可以对操作的结果做一些事情,而无需实际阻塞线程来等待结果。这是一个简单的例子:
/**
* A supplier that sleeps for a second, and then returns one
**/
public static class MySupplier implements Supplier<Integer> {
@Override
public Integer get() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
//Do nothing
}
return 1;
}
}
/**
* A (pure) function that adds one to a given Integer
**/
public static class PlusOne implements Function<Integer, Integer> {
@Override
public Integer apply(Integer x) {
return x + 1;
}
}
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newSingleThreadExecutor();
CompletableFuture<Integer> f = CompletableFuture.supplyAsync(new MySupplier(), exec);
System.out.println(f.isDone()); // False
CompletableFuture<Integer> f2 = f.thenApply(new PlusOne());
System.out.println(f2.get()); // Waits until the "calculation" is done, then prints 2
}
RxJava
RxJava 是在 Netflix 创建的 reactive programming 的整个库。乍一看,它似乎类似于Java 8's streams。确实如此,只是它更强大。
与 Futures 类似,RxJava 可用于将一堆同步或异步操作串在一起以创建处理管道。与一次性使用的 Futures 不同,RxJava 可以处理零个或多个项目的流。包括具有无限数量的项目的永无止境的流。得益于令人难以置信的丰富set of operators,它也更加灵活和强大。
与 Java 8 的流不同,RxJava 还具有backpressure 机制,它允许它处理处理管道的不同部分在不同线程中以不同速率运行的情况,。
RxJava 的缺点是,尽管有可靠的文档,但由于涉及到范式转换,学习它是一个具有挑战性的库。 Rx 代码也可能是调试的噩梦,尤其是在涉及多个线程的情况下,甚至更糟 - 如果需要背压。
如果你想深入了解,官网上有各种教程的完整的page,还有官方的documentation和Javadoc。您还可以观看一些视频,例如 this one,其中简要介绍了 Rx,并讨论了 Rx 和 Futures 之间的区别。
奖励:Java 9 反应式流
Java 9's Reactive Streams 又名Flow API 是一组由各种reactive streams 库实现的接口,例如RxJava 2、Akka Streams 和Vertx。它们允许这些反应式库互连,同时保留所有重要的背压。