制作一个自定义的OnSubscribe 实现来满足您的需求:
public static class OnSubscribeRefreshingCache<T> implements OnSubscribe<T> {
private final AtomicBoolean refresh = new AtomicBoolean(true);
private final Observable<T> source;
private volatile Observable<T> current;
public OnSubscribeRefreshingCache(Observable<T> source) {
this.source = source;
this.current = source;
}
public void reset() {
refresh.set(true);
}
@Override
public void call(Subscriber<? super T> subscriber) {
if (refresh.compareAndSet(true, false)) {
current = source.cache();
}
current.unsafeSubscribe(subscriber);
}
}
这段代码演示了用法并表明缓存实际上正在被重置:
Observable<Integer> o = Observable.just(1)
.doOnCompleted(() -> System.out.println("completed"));
OnSubscribeRefreshingCache<Integer> cacher =
new OnSubscribeRefreshingCache<Integer>(o);
Observable<Integer> o2 = Observable.create(cacher);
o2.subscribe(System.out::println);
o2.subscribe(System.out::println);
cacher.reset();
o2.subscribe(System.out::println);
输出:
completed
1
1
completed
1
顺便说一句,您可能会注意到 .cache 直到完成才会发出。这是一个应该由 rxjava 1.0.14 修复的错误。
就您的 GC 压力问题而言,每个运算符在应用于 Observable 时通常通过 lift 或 create 创建一个新的 Observable。与创建新 Observable 相关的基本成员状态是对 onSubscribe 函数的引用。 cache 与大多数不同之处在于它跨订阅保存状态,如果它保存大量状态并且经常被丢弃,这可能会产生 GC 压力。即使您使用相同的可变数据结构在重置时保持状态,GC 仍然必须在清除时处理数据结构的内容,因此您可能不会获得太多收益。
RxJava cache 运算符是为多个并发订阅而构建的。您可能会想象重置功能可能难以实现。如果您想进一步探索,请务必在 RxJava github 上提出问题。