这不是重放/缓存的工作方式。请先阅读#replay/#cache 文档。
重播
这个操作符返回一个 ConnectableObservable,它有一些方法 (#refCount/ #connect/ #autoConnect) 用于连接到源。
当#replay 被应用而没有过载时,源订阅被多播并且所有发出的值sind 连接将被重播。源订阅是惰性的,可以通过#refCount/#connect/#autoConnect连接源。
返回一个 ConnectableObservable,它共享对底层 ObservableSource 的单一订阅,该订阅将重播其所有项目和通知给任何未来的 Observer。
在没有任何连接方法的情况下应用#relay (#refCount/ #connect/ #autoConnect) 不会在订阅时发出任何值
Connectable ObservableSource 类似于普通的 ObservableSource,不同之处在于它不会在订阅时开始发射项目,而只是在调用其 connect 方法时。
重播(1)#autoConnect(-1) / #refCount(1) / #connect
应用 replay(1) 将缓存最后一个值,并将在每个订阅上发出缓存的值。 #autoConnect 将立即打开连接并保持打开状态,直到发生终端事件(onComplete、onError)。 #refCount 是 smiular,但会在所有订阅者消失时断开与源的连接。 #connect 运算符可以在您需要等待时使用,当所有对 observable 的订阅都已完成时,以免丢失值。
用法
#replay(1) -- 大部分应该用在 observable 的末尾。
sourcObs.
.filter()
.map()
.replay(bufferSize)
.refCount(connectWhenXSubsciberSubscribed)
注意
在没有缓冲区限制或到期日期的情况下应用#replay 将导致内存泄漏,当您观察到无限时
缓存/cacheWithInitialCapacity
运算符类似于带有 autoConnect(1) 的 #replay。运算符将缓存每个值并在每个订阅上重播。
仅当第一个下游订阅者订阅并维护对该 ObservableSource 的单个订阅时,操作员才订阅。相反,返回 ConnectableObservable 的操作符系列 replay() 需要显式调用 ConnectableObservable.connect()。
注意:当您使用缓存 Observer 时,您牺牲了处理源的能力,因此请注意不要在 ObservableSources 上使用此 Observer,因为该 ObservableSources 会发出无限或非常多的将耗尽内存的项目。一种可能的解决方法是在应用 cache() 之前(也可能在之后)将 takeUntil 与谓词或其他源一起应用。
示例
@Test
fun skfdsfkds() {
val create = PublishSubject.create<Int>()
val cacheWithInitialCapacity = create
.cacheWithInitialCapacity(1)
cacheWithInitialCapacity.subscribe()
create.onNext(1)
create.onNext(2)
create.onNext(3)
cacheWithInitialCapacity.test().assertValues(1, 2, 3)
cacheWithInitialCapacity.test().assertValues(1, 2, 3)
}
用法
使用缓存操作符,当你无法控制连接阶段时
当您希望 ObservableSource 缓存响应并且您无法控制所有观察者的订阅/处置行为时,这很有用。
注意
与 replay() 一样,缓存是无限的,可能会导致内存泄漏。
注意:容量提示不是缓存大小的上限。为此,请考虑将 replay(int) 与 ConnectableObservable.autoConnect() 或类似方法结合使用。
进一步阅读
https://blog.danlew.net/2018/09/25/connectable-observables-so-hot-right-now/
https://blog.danlew.net/2016/06/13/multicasting-in-rxjava/