【问题标题】:Difference between CompletableFuture, Future and RxJava's ObservableCompletableFuture、Future 和 RxJava 的 Observable 的区别
【发布时间】:2016-05-21 16:03:37
【问题描述】:

我想知道两者的区别 CompletableFuture,FutureObservable RxJava.

我所知道的是所有都是异步的,但是

Future.get() 阻塞线程

CompletableFuture 给出回调方法

RxJava Observable --- 与CompletableFuture 类似,但有其他好处(不确定)

例如:如果客户端需要进行多个服务调用,并且当我们使用Futures (Java) 时,Future.get() 将按顺序执行...想知道它在 RxJava 中的性能如何..

文档http://reactivex.io/intro.html

很难使用 Futures 来优化组合条件异步执行流(或者是不可能的,因为每个请求的延迟在运行时会有所不同)。当然,这可以做到,但它很快就会变得复杂(因此容易出错),或者它会过早地阻塞 Future.get(),从而消除了异步执行的好处。

真的很想知道RxJava 是如何解决这个问题的。我发现从文档中很难理解。

【问题讨论】:

  • 您是否阅读过每个文档?我对 RxJava 完全不熟悉,但文档一目了然。似乎与这两个未来没有特别可比性。
  • 我已经经历过,但无法理解它与 Java 期货的不同之处......如果我错了,请纠正我
  • observables 与 futures 有何相似之处?
  • 想知道它有什么不同,比如线程管理有什么不同?? EX:Future.get() 阻塞线程....在 Observable 中将如何处理???
  • 至少对我来说有点混乱……高水平的差异真的很有帮助!!

标签: java multithreading asynchronous java-8 rx-java


【解决方案1】:

期货

Futures 是在 Java 5 (2004) 中引入的。它们基本上是尚未完成的操作结果的占位符。操作完成后,Future 将包含该结果。例如,操作可以是提交给ExecutorServiceRunnableCallable 实例。操作的提交者可以使用Future对象来检查操作是否isDone(),或者使用阻塞的get()方法等待它完成。

例子:

/**
* A task that sleeps for a second, then returns 1
**/
public static class MyCallable implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        Thread.sleep(1000);
        return 1;
    }

}

public static void main(String[] args) throws Exception{
    ExecutorService exec = Executors.newSingleThreadExecutor();
    Future<Integer> f = exec.submit(new MyCallable());

    System.out.println(f.isDone()); //False

    System.out.println(f.get()); //Waits until the task is done, then prints 1
}

CompletableFutures

CompletableFutures 是在 Java 8 (2014) 中引入的。它们实际上是常规 Futures 的演变,灵感来自 Google 的 Listenable Futures,它是 Guava 库的一部分。它们是期货,还允许您将任务串联在一起。您可以使用它们告诉某个工作线程“去做一些任务 X,完成后,使用 X 的结果去做其他事情”。使用 CompletableFutures,您可以对操作的结果做一些事情,而无需实际阻塞线程来等待结果。这是一个简单的例子:

/**
* A supplier that sleeps for a second, and then returns one
**/
public static class MySupplier implements Supplier<Integer> {

    @Override
    public Integer get() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            //Do nothing
        }
        return 1;
    }
}

/**
* A (pure) function that adds one to a given Integer
**/
public static class PlusOne implements Function<Integer, Integer> {

    @Override
    public Integer apply(Integer x) {
        return x + 1;
    }
}

public static void main(String[] args) throws Exception {
    ExecutorService exec = Executors.newSingleThreadExecutor();
    CompletableFuture<Integer> f = CompletableFuture.supplyAsync(new MySupplier(), exec);
    System.out.println(f.isDone()); // False
    CompletableFuture<Integer> f2 = f.thenApply(new PlusOne());
    System.out.println(f2.get()); // Waits until the "calculation" is done, then prints 2
}

RxJava

RxJava 是在 Netflix 创建的 reactive programming 的整个库。乍一看,它似乎类似于Java 8's streams。确实如此,只是它更强大。

与 Futures 类似,RxJava 可用于将一堆同步或异步操作串在一起以创建处理管道。与一次性使用的 Futures 不同,RxJava 可以处理零个或多个项目的。包括具有无限数量的项目的永无止境的流。得益于令人难以置信的丰富set of operators,它也更加灵活和强大。

与 Java 8 的流不同,RxJava 还具有backpressure 机制,它允许它处理处理管道的不同部分在不同线程中以不同速率运行的情况,

RxJava 的缺点是,尽管有可靠的文档,但由于涉及到范式转换,学习它是一个具有挑战性的库。 Rx 代码也可能是调试的噩梦,尤其是在涉及多个线程的情况下,甚至更糟 - 如果需要背压。

如果你想深入了解,官网上有各种教程的完整的page,还有官方的documentationJavadoc。您还可以观看一些视频,例如 this one,其中简要介绍了 Rx,并讨论了 Rx 和 Futures 之间的区别。

奖励:Java 9 反应式流

Java 9's Reactive Streams 又名Flow API 是一组由各种reactive streams 库实现的接口,例如RxJava 2Akka StreamsVertx。它们允许这些反应式库互连,同时保留所有重要的背压。

【讨论】:

  • 如果能给出 Rx 是如何做到这一点的示例代码会很高兴
  • @IgorGanapolsky 是的。
  • 在 CompletableFutures 中我们使用回调方法,如果一个方法的输出是另一个回调的输入,这些回调方法也会阻塞。作为 Future.get() 调用的未来块。为什么说 Future.get() 阻塞调用而 CompletableFutures 不阻塞。请解释
  • @Federico 当然。每个Future 都是单个结果的占位符,该结果可能尚未完成,也可能尚未完成。如果您再次执行相同的操作,您将获得一个新的Future 实例。 RxJava 处理可能随时出现的结果。因此,一系列操作可以返回一个 RxJava observable,它会输出一堆结果。这有点像单个邮政信封和不断抽出邮件的气动管之间的区别。
  • @srk 是的。它阻塞直到计算完成。
【解决方案2】:

我从 0.9 开始使用 Rx Java,现在是 1.3.2,并很快迁移到 2.x。我在一个已经工作了 8 年的私人项目中使用它。

如果没有这个库,我再也不会编程了。一开始我持怀疑态度,但这是你需要创造的一种完全不同的心态。刚开始很困难。我有时会盯着弹珠看几个小时……哈哈

这只是一个实践问题,并且真正了解流程(也称为可观察对象和观察者的合同),一旦你到达那里,你就会讨厌这样做。

对我来说,那个库并没有真正的缺点。

用例: 我有一个包含 9 个仪表(cpu、mem、网络等)的监视器视图。当启动视图时,视图将自己订阅到一个系统监视器类,该类返回一个包含 9 米的所有数据的可观察(间隔)。 它将每秒向视图推送一个新结果(所以不要轮询!!!)。 该 observable 使用平面图同时(异步!)从 9 个不同的来源获取数据,并将结果压缩到您的视图将在 onNext() 上获取的新模型中。

你怎么会用期货、可完成品等来做这件事……祝你好运! :)

Rx Java 为我解决了编程中的许多问题,并且让我变得更容易......

优点:

  • 无状态!!! (重要的是要提,也许是最重要的)
  • 开箱即用的线程管理
  • 构建具有自己生命周期的序列
  • 一切都是可观察的,因此链接很容易
  • 编写的代码更少
  • 类路径上的单个 jar(非常轻量级)
  • 高并发
  • 不再有回调地狱
  • 基于订阅者(消费者和生产者之间的紧密合同)
  • 背压策略(断路器等)
  • 出色的错误处理和恢复
  • 非常好的文档(大理石
  • 完全控制
  • 还有更多...

缺点: - 难以测试

【讨论】:

  • ~"如果没有这个库,我再也不会编程了。" 所以 RxJava 是所有软件项目的终极目标?
  • 即使我没有异步事件流也有用吗?
【解决方案3】:

所有三个接口都用于将值从生产者传输到消费者。消费者可以有两种:

  • 同步:消费者进行阻塞调用,当值准备好时返回
  • 异步:当值准备好时,调用消费者的回调方法

此外,通信接口在其他方面也有所不同:

  • 能够传输多个值的单个值
  • 如果有多个值,可以支持或不支持背压

结果:

  • Future 使用同步接口传输单个值

  • CompletableFuture 使用同步和异步接口传输单个值

  • Rx 使用带背压的异步接口传输多个值

此外,所有这些通信工具都支持传输异常。这并非总是如此。例如,BlockingQueue 没有。

【讨论】:

    【解决方案4】:

    CompletableFuture 相对于普通 Future 的主要优势在于 CompletableFuture 利用了极其强大的流 API 并为您提供了回调处理程序来链接您的任务,如果您使用普通 Future,这是绝对不存在的。除了提供异步架构外,CompletableFuture 还是处理计算繁重的 map-reduce 任务的理想之选,无需过多担心应用程序的性能。

    【讨论】:

      【解决方案5】:

      Java 的Future 是一个占位符,用于保存将来使用阻塞 API 完成的内容。您必须使用它的isDone() 方法定期轮询它以检查该任务是否完成。当然,您可以实现自己的异步代码来管理轮询逻辑。但是,它会产生更多样板代码和调试开销。

      Java 的 CompletableFuture 由 Scala 的 Future 创新。它带有一个内部回调方法。一旦完成,将触发回调方法并告诉线程应该执行下游操作。这就是为什么它有thenApply 方法对包裹在CompletableFuture 中的对象做进一步的操作。

      RxJava 的ObservableCompletableFuture 的增强版。它允许您处理背压。在我们上面提到的thenApply 方法(甚至还有它的兄弟thenApplyAsync)中,可能会发生这种情况:下游方法想要调用有时可能不可用的外部服务。在这种情况下,CompleteableFuture 将完全失败,您必须自己处理错误。但是,Observable 允许您处理背压并在外部服务可用后继续执行。

      另外还有Observable的类似接口:Flowable。它们是为不同的目的而设计的。通常Flowable 专用于处理冷操作和非定时操作,而Observable 专用于处理需要即时响应的执行。官方文档看这里:https://github.com/ReactiveX/RxJava#backpressure

      【讨论】:

        猜你喜欢
        • 2017-08-03
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2017-10-24
        • 2018-09-14
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多