摘要: 原创出处 http://www.iocoder.cn/RxJava/blocking-observable/ 「芋道源码」欢迎转载,保留摘要,谢谢!
本文主要基于 RxJava 1.2.X 版本
本系列写作目的,为了辅助 Hystrix 的理解,因此会较为零散与琐碎,望见谅见谅。
《ReactiveX/RxJava文档中文版 —— 阻塞操作》
BlockingObservable 的方法不是将一个 Observable 变换为另一个,也不是过滤Observables,它们会打断 Observable 的调用链,会阻塞等待直到 Observable 发射了想要的数据,然后返回这个数据(而不是一个 Observable )。
1. toBlocking
调用 Observable#toBlocking() 或 BlockingObservable#from(Observable) 方法,将 Observable 转换成 BlockingObservable 。代码如下:
|
- 从代码上我们可以看到,BlockingObservable 并未将 Observable 转换成新的,而是简单的包了一层。
2. toFuture
《ReactiveX/RxJava文档中文版 —— TO》#toFuture() 操作符也是只能用于 BlockingObservable 。这个操作符将Observable 转换为一个返回单个数据项的 Future 。
- 如果原始 Observable 发射多个数据项,Future会收到一个IllegalArgumentException;
- 如果原始 Observable 没有发射任何数据,Future会收到一个NoSuchElementException。
如果你想将发射多个数据项的 Observable 转换为 Future ,可以这样用:myObservable.toList().toBlocking().toFuture() 。
点击链接 查看 #toFuture() 的代码实现:
- 通过向传入 Observable 订阅 Subscriber ,打断 Observable 的调用链,会阻塞等待直到 Observable 发射了想要的数据。
-
#onNext()方法,设置执行的返回值(value)。 -
#onCompleted()方法,CountDownLatch (finished) 减一。 -
#onError()方法,设置执行时发生的异常(error),并 CountDownLatch (finished) 减一。
-
- 返回的 Future ,通过 CountDownLatch (
error) 判断是否执行完成;通过value,error获得执行的结果。