【发布时间】:2015-11-14 04:29:58
【问题描述】:
我有一个使用以下代码创建的可观察对象。
Observable.create(new Observable.OnSubscribe<ReturnType>() {
@Override
public void call(Subscriber<? super ReturnType> subscriber) {
try {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(performRequest());
}
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
}
});
performRequest() 将按照您的预期执行长时间运行的任务。
现在,由于我可能会在很短的时间内启动 same Observable 两次或更多次,因此我决定编写这样的转换器:
protected Observable.Transformer<ReturnType, ReturnType> attachToRunningTaskIfAvailable() {
return origObservable -> {
synchronized (mapOfRunningTasks) {
// If not in maps
if ( ! mapOfRunningTasks.containsKey(getCacheKey()) ) {
Timber.d("Cache miss for %s", getCacheKey());
mapOfRunningTasks.put(
getCacheKey(),
origObservable
.doOnTerminate(() -> {
Timber.d("Removed from tasks %s", getCacheKey());
synchronized (mapOfRunningTasks) {
mapOfRunningTasks.remove(getCacheKey());
}
})
.cache()
);
} else {
Timber.d("Cache Hit for %s", getCacheKey());
}
return mapOfRunningTasks.get(getCacheKey());
}
};
}
这基本上将原始.cache 可观察到HashMap<String, Observable>。
这基本上不允许具有相同getCacheKey()(例如login)的多个请求并行调用performRequest()。相反,如果第二个 login 请求到达而另一个正在进行中,则第二个可观察请求将被“丢弃”,而将使用已经运行的请求。 => 所有对onNext 的调用都将是cached 并发送给两个订阅者,实际上只访问我的后端一次。
现在,假设以下代码:
// Observable loginTask
public void doLogin(Observable<UserInfo> loginTask) {
loginTask.subscribe(
(userInfo) -> {},
(throwable) -> {
if (userWantsToRetry()) {
doLogin(loinTask);
}
}
);
}
loginTask 是由前一个转换器组成的。好吧,当发生错误(可能是连接)和userWantsToRetry() 时,我基本上会用相同的可观察对象重新调用该方法。不幸的是,它已被缓存,我将收到相同的错误,而无需再次点击 performRequest(),因为该序列被重播。
有没有一种方法可以让转换器提供给我的“相同请求分组”行为和重试按钮?
【问题讨论】:
标签: reactive-programming rx-java