【问题标题】:RxJava Subject Cache Result With Retry重试的 RxJava 主题缓存结果
【发布时间】:2017-01-10 23:41:28
【问题描述】:

我有Observable<FeaturedItemList> getFeatured(),每次打开页面时都会调用它。从同一页面上的两个不同组件调用此函数。由于它是从网络中检索的,因此我对其进行了缓存并使其与ReplaySubject 共享。

public Observable<FeaturedItemList> getFeatured() {
    if(mFeaturedReplaySubject == null) {
        mFeaturedReplaySubject = ReplaySubject.create();
        getFromNetwork().subscribe(mFeaturedReplaySubject);
    }

    return mFeaturedReplaySubject;
}

然后我意识到,当请求由于某些原因失败时,如果用户返回该页面,除非用户终止应用程序,否则它不会显示任何结果。所以我决定有一些重试逻辑。这是我的工作:

public Observable<FeaturedItemList> getFeatured() {
    synchronized (this) {
        if (mFeaturedReplaySubject == null) {
            mFeaturedReplaySubject = ReplaySubject.create();
            getFromNetwork().subscribe(mFeaturedReplaySubject);

            return mFeaturedReplaySubject;
        } else {
            return mFeaturedReplaySubject.onErrorResumeNext(throwable -> {
                mFeaturedReplaySubject = null;
                return getFeatured();
            });
        }
    }
}

虽然这行得通,但恐怕我在这里做的不好,因为这种方法无法涵盖这种情况。 有没有更好的办法?

为了分享可观察的使用主题,我在某处读到了可以使用connect()publish()share(),但我不知道如何使用它。

【问题讨论】:

    标签: java rx-java rx-android


    【解决方案1】:

    代码

    private Observable<FeaturedItemList> mFeatured =
        // initialized on construction
        getFromNetwork()
            .retry(3) // number of times to retry
            .cache();
    
    public Observable<FeaturedItemList> getFeatured() {
        return mFeatured;
    }
    

    说明

    网络来源

    你的getFromNetwork() 函数应该返回常规的 observable,它应该在每次订阅时访问网络。 调用时不能上网。例如:

    Future<FeaturedItemList> makeAsyncNetworkRequest() {
        ... initiate network request here ...
    }
    
    Observable<FeaturedItemList> getFromNetwork() {
        return Observable.fromCallable(this::makeAsyncNetworkRequest)
            .flatMap(Observable::fromFuture);
    }
    

    重试

    有一系列.retryXxx() 运算符,仅在错误时激活。他们要么重新订阅源代码,要么将错误传递下去,这取决于各种条件。 如果没有错误,这些运算符什么也不做。我在示例中使用了简单的retry(count),它将重试指定的次数而不会延迟。您可以使用retryWhen() 添加延迟或任何复杂的逻辑(参见herehere 示例)。

    缓存

    cache() 操作员记录事件序列并将其重播给所有新订阅者。不好的是它不能刷新。它永远存储上游的结果,无论是成功还是错误,并且永远不会重试。

    替代cache()

    replay().refCount() 向所有现有订阅者重播事件,但在所有订阅者取消订阅(或完成)后立即忘记所有内容。当新订阅者到达时,它将重新订阅getFromNetwork()(当然会出现错误重试)。

    提及但不需要的操作员

    share()publish().refCount() 的简写。它允许多个并发订阅者共享单个订阅,即对subscribe() 进行一次调用,而不是为每个订阅者都这样做。 cache()replay().refCount() 都包含相同的功能。

    【讨论】:

    • getFeatured() 将从两个地方调用,最终调用subscribe()。由于 Observable 是不可变的,我想我必须使实例变量?就像我对主题所做的那样? mFeaturedObs = getFromNetwork().cache().share()share() 真的在这样做吗?
    • 至于retryWhen()getFromNetwork()应该只在当前活动实例第一次调用getFeatured()时重试,因为我不想每次都重试函数就在用户打开活动时被调用。但我认为如果不使用某种布尔变量就无法做到这一点,对吧?
    • 感谢您的补充说明!也用于提出replay().refCount()。有一种感觉,我会用这个代替cache()
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-10-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-05-22
    • 1970-01-01
    相关资源
    最近更新 更多