【问题标题】:Retrofit2+RxJava2, Invalid token, how to update stream when retryWhen() re-subscribeRetrofit2+RxJava2,无效令牌,retryWhen() 重新订阅时如何更新流
【发布时间】:2017-04-19 06:26:21
【问题描述】:

我在下面有这个简单的代码,它模拟了我目前正在尝试完成的场景

mApiService.api().postSomethingWithAccessToken(request, "some_invalid_access_token")
            .subscribeOn(Schedulers.io())
            .retryWhen(new Function<Observable<Throwable>, ObservableSource<AccessToken>>() {

                @Override
                public ObservableSource<AccessToken> apply(Observable<Throwable> throwableObservable) throws Exception {
                    return mApiService.api().getAccessToken();
                }
            })
            .subscribeOn(Schedulers.io())
            .subscribe(new Observer<Void>() {
                @Override
                public void onSubscribe(Disposable d) {
                }

                @Override
                public void onNext(Void value) {
                }

                @Override
                public void onError(Throwable e) {

                    e.printStackTrace();
                    onError(e);
                }

                @Override
                public void onComplete() {
                }
            });

我只是列举它以明确我的目标:

  1. 使用当前访问令牌执行 POST 调用
  2. 如果收到适当的错误(404,403、401 等)
  3. 执行 GET 调用以获得新的访问令牌
  4. 使用新的访问令牌重试整个序列

基于上面的代码和我目前对 .retryWhen() 的理解,如果原始 Observable( .postSomethingWithAccessToken()),并在必要时重试(根据您在重试中的条件),这里发生的情况是 .retryWhen() 在外部 Observable 之前首先执行,导致不需要的重复要求, 根据我目前的理解(代码),我怎样才能实现上面提到的那些事情?任何帮助将不胜感激。 :(

编辑:当前解决方法:

mApiService.api().postSomethingWithAccessToken(request, preferences.getString("access_token", ""))
            .subscribeOn(Schedulers.io())
            .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {

                @Override
                public ObservableSource<?> apply(final Observable<Throwable> throwableObservable) throws Exception {

                    return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {

                        @Override
                        public ObservableSource<?> apply(Throwable throwable) throws Exception {

                            if (throwable instanceof HttpException) {

                                HttpException httpException = (HttpException) throwable;

                                if (httpException.code() == 401) {

                                    return mApiService.api().getAccessToken()
                                            .doOnNext(new Consumer<Authentication>() {
                                                @Override
                                                public void accept(Authentication authentication) throws Exception {
                                                    update(authentication);
                                                }
                                            });
                                }
                            }

                            return Observable.error(throwable);
                        }
                    });
                }
            })
            .subscribe(new Observer<Void>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.e("subscribe", "TOKEN : " + preferences.getString("access_token", ""));
                }

                @Override
                public void onNext(Void value) {
                    Log.e("onNext", "TOKEN : " + preferences.getString("access_token", ""));
                }

                @Override
                public void onError(Throwable e) {
                    e.printStackTrace();
                }

                @Override
                public void onComplete() {
                    Log.e("Complete", "____ COMPLETE");
                }
            });

通过共享偏好更新令牌的方法

public void update(Authentication authentication) {
    preferences.edit().putString("access_token", authentication.getAccessToken()).commit();
}

我注意到(我放了一个日志)外部 observable 的订阅和 retryWhen 是在主线程执行的,但是重试/重新订阅的流正在跳过不同的调度程序的线程,这似乎是一个竞争条件:(

    onSubscrbie_outer_observable: Thread[main,5,main]
    RetryWhen: Thread[main,5,main]
    Throwable_FlatMap: Thread[RxCachedThreadScheduler-1,5,main]
    doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-1,5,main]
    Throwable_FlatMap: Thread[RxCachedThreadScheduler-2,5,main]
    doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-2,5,main]
    Throwable_FlatMap: Thread[RxCachedThreadScheduler-1,5,main]
    doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-1,5,main]
    // and so on...

【问题讨论】:

    标签: rx-java retrofit2 rx-java2 reactive resque-retry


    【解决方案1】:

    这里有几个问题:

    • 重试时您需要将访问令牌传回postSomethingWithAccessToken 方法,否则您将使用相同的旧无效访问令牌重试。
    • 当你的重试逻辑不正确时,你必须回应你得到的错误Observable并将你的重试逻辑放在那里。正如您所说,此方法首先执行,而不是在发生错误时执行,throwableObservable 是对错误的响应,它将错误反映为排放(onNext()),您可以flatMap() 每个错误和响应错误(用于将错误传递到源流)完成,或使用 onNext() 和一些对象发出信号以重试。
      在这个主题上很棒的blog post ban Dan Lew

    所以你需要:
    1) 将访问令牌存储在您可以通过访问令牌刷新对其进行更改的位置。
    2) 修复逻辑正确响应错误时的重试

    这是一个建议代码:

    postSomethingWithAccessToken(request, accessToken)
            .subscribeOn(Schedulers.io())
            .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
                       @Override
                       public ObservableSource<?> apply(
                               @NonNull Observable<Throwable> throwableObservable) throws Exception {
                           return throwableObservable.flatMap(
                                   new Function<Throwable, ObservableSource<? extends R>>() {
                                       @Override
                                       public ObservableSource<? extends R> apply(
                                               @NonNull Throwable throwable) throws Exception {
                                           if (throwable.code == 401) { //or 404/403, just a pseudo-code, put your real error comparing logic here
                                               return getAccessToken()
                                                               .doOnNext(refreshedToken -> accessToken.updateToken(refreshedToken));
                                                       //or keep accessToken on some field, the point to have mutable
                                                       //var that you can change and postSomethingWithAccessToken can see
                                           }
                                           return Observable.error(throwable);
                                       }
                                   });
                           }
                       }
            )
            .subscribeOn(Schedulers.io())
            .subscribe(new Consumer<Result>() {
                           @Override
                           public void accept(@NonNull Result result) throws Exception {
                               //handle result
                           }
                       }
            );
    

    【讨论】:

    • 您好,非常感谢您提供代码示例并指出错误点,如果要求不高,您介意将代码更改为非 lambda 吗?,
    • 非常感谢!我会努力的,我会尝试重新编写我的代码。 :) ,也感谢您的链接 :)
    • 只是一个问题,retryWhen() 先执行真的正常吗?在实际的外部可观察执行之前? - 我不知道我是否理解正确 -> “在订阅时调用工厂 Func1 以设置重试逻辑。这样,当调用 onError 时,您已经定义了如何处理它。”
    • 是的,正如它所解释的,重试逻辑首先由您的 Func1 方法构造,然后 throwableObservable 将订阅,然后才是源 Observable。这就是为什么您不能在那里执行实际工作的原因,您应该将重试逻辑集成到 throwableObservable 流中。这意味着重试是基于对 throwableObservable 错误发射的反应,而不是对 Func1 方法体的反应。
    • 您建议的代码完全可以工作,我只是在更新令牌时遇到了一些问题,即使是静态变量或对象字段也使得重试不停,无论如何,“重试逻辑首先由你的 Func1 方法,然后 throwableObservable 将订阅,然后才订阅源 Observable。” - 谢谢,这对我来说是一个很好的开始:) :) 谢谢谢谢
    【解决方案2】:

    非常感谢 yosriz 为我指出了正确的方向来解决我的磨牙问题,我必须使用defer。所以我最终在 GitHub 中解决了这个问题,Why resubscribe the source observable emit same output when I use retryWhen operator?

    这与我现在遇到的问题完全相同,对于遇到相同问题的任何人来说,这里都是我的解决方案。

    Observable
        .defer(new Callable<ObservableSource<?>>() {
            @Override
            public ObservableSource<?> call() throws Exception {
                // return an observable source here, the observable that will be the source of the entire stream;
            }
        })
        .subscribeOn( /*target thread to run*/ )
        .retryWhen( {
            // return a throwable observable here that will perform the logic when an error occurred
        })
        .subscribe( /*subscription here*/ )
    

    或者这里是我的解决方案的完整非 lambda

    Observable
        .defer(new Callable<ObservableSource<?>>() {
            @Override
            public ObservableSource<?> call() throws Exception {
                return mApiService.api().postSomethingWithAccessToken(
                    request, preferences.getString("access_token", ""));
            }
        })
        .subscribeOn(Schedulers.io())
        .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(final Observable<Throwable> throwableObservable) throws Exception {
                return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(Throwable throwable) throws Exception {
                        if (throwable instanceof HttpException) {
                            HttpException httpException = (HttpException) throwable;
                            if (httpException.code() == 401) {
                                return mApiService.api().getAccessToken().doOnNext(new Consumer<Authentication>() {
                                        @Override
                                        public void accept(Authentication authentication) throws Exception {
                                            update(authentication);
                                        }
                                    });
                            }
                        }
                        return Observable.error(throwable);
                    }
                });
            }
        })
        .subscribe(new Observer<Void>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("subscribe", "TOKEN : " + preferences.getString("access_token", ""));
            }
    
            @Override
            public void onNext(Void value) {
                Log.e("onNext", "TOKEN : " + preferences.getString("access_token", ""));
            }
    
            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }
    
            @Override
            public void onComplete() {
                Log.e("Complete", "____ COMPLETE");
            }
        });
    

    这里的重点是“.retryWhen()运营商重新订阅源observable时如何修改/更新现有源observable”

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-05-04
      • 2023-03-29
      • 2019-03-12
      • 2012-08-25
      • 2012-01-18
      • 2015-09-27
      • 1970-01-01
      • 2019-05-01
      相关资源
      最近更新 更多