【发布时间】:2017-04-19 06:26:21
【问题描述】:
我在下面有这个简单的代码,它模拟了我目前正在尝试完成的场景
mApiService.api().postSomethingWithAccessToken(request, "some_invalid_access_token")
.subscribeOn(Schedulers.io())
.retryWhen(new Function<Observable<Throwable>, ObservableSource<AccessToken>>() {
@Override
public ObservableSource<AccessToken> apply(Observable<Throwable> throwableObservable) throws Exception {
return mApiService.api().getAccessToken();
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Observer<Void>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Void value) {
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
onError(e);
}
@Override
public void onComplete() {
}
});
我只是列举它以明确我的目标:
- 使用当前访问令牌执行 POST 调用
- 如果收到适当的错误(404,403、401 等)
- 执行 GET 调用以获得新的访问令牌
- 使用新的访问令牌重试整个序列
基于上面的代码和我目前对 .retryWhen() 的理解,如果原始 Observable( .postSomethingWithAccessToken()),并在必要时重试(根据您在重试中的条件),这里发生的情况是 .retryWhen() 在外部 Observable 之前首先执行,导致不需要的重复要求, 根据我目前的理解(代码),我怎样才能实现上面提到的那些事情?任何帮助将不胜感激。 :(
编辑:当前解决方法:
mApiService.api().postSomethingWithAccessToken(request, preferences.getString("access_token", ""))
.subscribeOn(Schedulers.io())
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(final Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
if (throwable instanceof HttpException) {
HttpException httpException = (HttpException) throwable;
if (httpException.code() == 401) {
return mApiService.api().getAccessToken()
.doOnNext(new Consumer<Authentication>() {
@Override
public void accept(Authentication authentication) throws Exception {
update(authentication);
}
});
}
}
return Observable.error(throwable);
}
});
}
})
.subscribe(new Observer<Void>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("subscribe", "TOKEN : " + preferences.getString("access_token", ""));
}
@Override
public void onNext(Void value) {
Log.e("onNext", "TOKEN : " + preferences.getString("access_token", ""));
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
Log.e("Complete", "____ COMPLETE");
}
});
通过共享偏好更新令牌的方法
public void update(Authentication authentication) {
preferences.edit().putString("access_token", authentication.getAccessToken()).commit();
}
我注意到(我放了一个日志)外部 observable 的订阅和 retryWhen 是在主线程执行的,但是重试/重新订阅的流正在跳过不同的调度程序的线程,这似乎是一个竞争条件:(
onSubscrbie_outer_observable: Thread[main,5,main]
RetryWhen: Thread[main,5,main]
Throwable_FlatMap: Thread[RxCachedThreadScheduler-1,5,main]
doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-1,5,main]
Throwable_FlatMap: Thread[RxCachedThreadScheduler-2,5,main]
doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-2,5,main]
Throwable_FlatMap: Thread[RxCachedThreadScheduler-1,5,main]
doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-1,5,main]
// and so on...
【问题讨论】:
标签: rx-java retrofit2 rx-java2 reactive resque-retry