【发布时间】:2016-06-21 18:00:33
【问题描述】:
一直在努力解决这个问题。基本上,我不希望轮询器在轮询开始后每 30 秒启动一次 ajax——我希望轮询器在前一个请求返回后 30 秒启动请求。另外,我想围绕失败的指数回退制定一些策略。
这是我目前所拥有的 (Rx4):
rx.Observable.create(function(observer) {
var nextPoll = function(obs) {
// the action function invoked below is what i'm passing in
// to my poller service as any function which returns a promise
// and must be invoked each loop due to fromPromise caching
rx.Observable.fromPromise(action())
.map(function (x){ return x.data; })
.subscribe(function(d) {
// pass promise up to parent observable
observer.onNext(d);
// reset interval in case previous call was an error
interval = initInterval;
setTimeout(function(){ nextPoll(obs); }, interval);
}, function(e) {
// push interval higher (exponential backoff)
interval = interval < maxInterval ? interval * 2 : maxInterval;
setTimeout(function(){ nextPoll(obs); }, interval);
});
};
nextPoll(observer);
});
在大多数情况下,这可以满足我的需求。我不喜欢使用 setTimeout,但我似乎找不到更好的 Observable 方法(除了一次性间隔/计时器与另一个订阅)。
我无法解决的另一件事是能够控制轮询器在最初启动时是否可以延迟启动或立即触发。对于某些用途,我将在开始轮询之前获取数据,因此我可以让它在第一次触发之前等待一段时间。到目前为止,我只有在第一个 ajax 之前或在 ajax 之间发生的计时器/延迟并将其提供给订阅者,这对我不起作用。
如果有任何关于清理这个问题的想法,无论是在一般情况下还是在摆脱 setTimeout 方面,都将不胜感激。而且,如果有人有办法以可选的延迟启动此轮询器,那将是巨大的!谢谢大家!!
更新:终于按照我的设想完成了这项工作。看起来是这样的:
function computeInterval(error) {
if (error) {
// double until maximum interval on errors
interval = interval < maxInterval ? interval * 2 : maxInterval;
} else {
// anytime the poller succeeds, make sure we've reset to
// default interval.. this also allows the initInterval to
// change while the poller is running
interval = initInterval;
}
return interval;
}
poller$ = rx.Observable.fromPromise(function(){ return _this.action(); })
.retryWhen(function(errors){
return errors.scan(function(acc, x) { return acc + x; }, 0)
.flatMap(function(x){
return rx.Observable.timer(computeInterval(true));
});
})
.repeatWhen(function(notification){
return notification
.scan(function(acc, x) { return acc + x; }, 0)
.flatMap(function(x){
return rx.Observable.timer(computeInterval());
});
});
【问题讨论】:
标签: javascript rxjs