【问题标题】:RxJava Observable.cache invalidateRxJava Observable.cache 无效
【发布时间】:2015-07-30 20:29:33
【问题描述】:

我正在尝试在 Android 环境中学习 rxjava。 假设我有一个发出网络调用结果的可观察对象。 如果我理解正确,处理配置更改的一种广泛常见的方法是:

  • 将可观察对象存储在保留的片段/单例/应用程序对象中

  • cache 运算符应用于 observable

  • 在适当的生命周期处理程序中订阅/取消订阅

这样做,我们不会丢失 observable 的结果,一旦新的配置发生,它将重新观察。

现在,我的问题是:

有没有办法强制 observable 发出一个新值(并使缓存的值无效)?每次我想要来自网络的新数据时,我是否需要创建一个新的 observable(这在 android 世界中听起来不是一个坏习惯,因为会使 gc 做额外的工作)?

非常感谢,

费德里科

【问题讨论】:

    标签: android rx-java


    【解决方案1】:

    制作一个自定义的OnSubscribe 实现来满足您的需求:

    public static class OnSubscribeRefreshingCache<T> implements OnSubscribe<T> {
    
        private final AtomicBoolean refresh = new AtomicBoolean(true);
        private final Observable<T> source;
        private volatile Observable<T> current;
    
        public OnSubscribeRefreshingCache(Observable<T> source) {
            this.source = source;
            this.current = source;
        }
    
        public void reset() {
            refresh.set(true);
        }
    
        @Override
        public void call(Subscriber<? super T> subscriber) {
            if (refresh.compareAndSet(true, false)) {
                current = source.cache();
            }
            current.unsafeSubscribe(subscriber);
        }
    
    }
    

    这段代码演示了用法并表明缓存实际上正在被重置:

    Observable<Integer> o = Observable.just(1)
            .doOnCompleted(() -> System.out.println("completed"));
    OnSubscribeRefreshingCache<Integer> cacher = 
        new OnSubscribeRefreshingCache<Integer>(o);
    Observable<Integer> o2 = Observable.create(cacher);
    o2.subscribe(System.out::println);
    o2.subscribe(System.out::println);
    cacher.reset();
    o2.subscribe(System.out::println);
    

    输出:

    completed
    1
    1
    completed
    1
    

    顺便说一句,您可能会注意到 .cache 直到完成才会发出。这是一个应该由 rxjava 1.0.14 修复的错误。

    就您的 GC 压力问题而言,每个运算符在应用于 Observable 时通常通过 liftcreate 创建一个新的 Observable。与创建新 Observable 相关的基本成员状态是对 onSubscribe 函数的引用。 cache 与大多数不同之处在于它跨订阅保存状态,如果它保存大量状态并且经常被丢弃,这可能会产生 GC 压力。即使您使用相同的可变数据结构在重置时保持状态,GC 仍然必须在清除时处理数据结构的内容,因此您可能不会获得太多收益。

    RxJava cache 运算符是为多个并发订阅而构建的。您可能会想象重置功能可能难以实现。如果您想进一步探索,请务必在 RxJava github 上提出问题。

    【讨论】:

    • 清理了一点
    • 看起来很整洁!然而,通过这样做,我们有一个更优雅和 rx-ish 的 api,但我认为我们只是隐藏了可观察的娱乐。如果我没有读错 rxjava 的源代码,.cache() 会返回一个新的(缓存的)observable:github.com/ReactiveX/RxJava/blob/…
    • 再次感谢您的回答,再准确不过了:-)
    猜你喜欢
    • 1970-01-01
    • 2017-10-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-05-24
    • 2020-10-08
    相关资源
    最近更新 更多