【问题标题】:How to build an rx poller that waits some interval AFTER the previous ajax promise resolves?如何构建一个在前一个 ajax 承诺解决后等待一段时间的 rx 轮询器?
【发布时间】:2016-06-21 18:00:33
【问题描述】:

一直在努力解决这个问题。基本上,我不希望轮询器在轮询开始后每 30 秒启动一次 ajax——我希望轮询器在前一个请求返回后 30 秒启动请求。另外,我想围绕失败的指数回退制定一些策略。

这是我目前所拥有的 (Rx4):

rx.Observable.create(function(observer) {
    var nextPoll = function(obs) {
      // the action function invoked below is what i'm passing in
      // to my poller service as any function which returns a promise
      // and must be invoked each loop due to fromPromise caching
      rx.Observable.fromPromise(action())
        .map(function (x){ return x.data; })
        .subscribe(function(d) {       
            // pass promise up to parent observable  
            observer.onNext(d);

            // reset interval in case previous call was an error
            interval = initInterval;   
            setTimeout(function(){ nextPoll(obs); }, interval);
        }, function(e) {
          // push interval higher (exponential backoff)
          interval = interval < maxInterval ? interval * 2 : maxInterval;
          setTimeout(function(){ nextPoll(obs); }, interval);

        });
    };
    nextPoll(observer);
});

在大多数情况下,这可以满足我的需求。我不喜欢使用 setTimeout,但我似乎找不到更好的 Observable 方法(除了一次性间隔/计时器与另一个订阅)。

我无法解决的另一件事是能够控制轮询器在最初启动时是否可以延迟启动或立即触发。对于某些用途,我将在开始轮询之前获取数据,因此我可以让它在第一次触发之前等待一段时间。到目前为止,我只有在第一个 ajax 之前或在 ajax 之间发生的计时器/延迟并将其提供给订阅者,这对我不起作用。

如果有任何关于清理这个问题的想法,无论是在一般情况下还是在摆脱 setTimeout 方面,都将不胜感激。而且,如果有人有办法以可选的延迟启动此轮询器,那将是巨大的!谢谢大家!!

更新:终于按照我的设想完成了这项工作。看起来是这样的:

function computeInterval(error) {
  if (error) {
    // double until maximum interval on errors
    interval = interval < maxInterval ? interval * 2 : maxInterval;
  } else {
    // anytime the poller succeeds, make sure we've reset to
    // default interval.. this also allows the initInterval to 
    // change while the poller is running
    interval = initInterval;
  }
  return interval;
}

poller$ = rx.Observable.fromPromise(function(){ return _this.action(); })
  .retryWhen(function(errors){
    return errors.scan(function(acc, x) { return acc + x; }, 0)
      .flatMap(function(x){ 
        return rx.Observable.timer(computeInterval(true));
      });
  })
  .repeatWhen(function(notification){
    return notification
      .scan(function(acc, x) { return acc + x; }, 0)
      .flatMap(function(x){ 
        return rx.Observable.timer(computeInterval());
      });
  });

【问题讨论】:

    标签: javascript rxjs


    【解决方案1】:

    我使用另一种可能有用和/或更简单的技术添加了另一个答案。它基于repeatWhen 运算符。正如documentation 解释的那样,它允许您重复订阅一个可观察对象,直到您发出重复结束信号,表明正常完成或错误。

    var action; // your action function
    Rx.Observable.create(function (observer) {
        function executeAction(action) {
            return Rx.Observable.fromPromise(action());
        }
    
        function computeDelay(){
            // put your exponential delaying logic here
        }
    
    executeAction()
      .repeatWhen(function(notification){
        return Rx.Observable.return({}).delay(computeDelay());
      })
      .subscribe(function(x){
        observer.onNext(x.data);
      })
    });
    

    这里有一个terrific article 解释repeatWhenretryWhen 比官方文档更好(它适用于RxJava,但它适用于Rxjs,稍作修改)。它还提供了一个示例,说明您想要实现的目标(指数延迟重复)。

    【讨论】:

    • 嗯.. 还没有到那里,但这肯定是有帮助的。当我最终确定我的解决方案时,我会在这里发布一些东西。再次感谢!
    • 没有到达是什么意思?我希望这将是一个更简单的解决方案。
    • 我刚刚更新了我的初始帖子,说明我需要做些什么才能真正让它正常工作。感谢您的所有帮助。
    【解决方案2】:

    只是快速思考了一下,所以必须对其进行测试,但希望它能让你走上一条有价值的道路:

    var action; // your action function
    Rx.Observable.create(function (observer) {
        function executeAction(action) {
            return Rx.Observable.fromPromise(action()).materialize();
        }
    
        function computeDelay(){
            // put your exponential delaying logic here
        }
    
        executeAction()
            .expand(function (x) {
                return Rx.Observable.return({})
                    .delay(computeDelay())
                    .flatMap(function(){return executeAction(action);})
            })
            .subscribe(function(notification){
                 if (notification.kind === "N") {
                   observer.onNext(notification.value.data);
                 } else if (notification.kind === "E") {
                   console.log("error:", notification.error.message);
                 }
            });
    });
    

    简而言之,这个想法是使用expand 运算符进行循环,使用delay 运算符进行延迟。看看documentation。使用 materialize 运算符和通知机制管理错误(这可以避免在 promise 返回错误的情况下突然终止您的轮询流)。

    【讨论】:

    • 嘿,所以我认为您使用 expand 是对的。不幸的是,我被困在这个内部循环中,在.expand 之后没有订阅就无法运行。显然,ajax 调用发生了一次,但是如果没有 executeAction.expand().subscribe(),expand 永远不会拾取它。此外,如果承诺错误,循环将退出。以前,在这里使用订阅,即使出现错误情况,我也能继续运行。
    • 是的,很抱歉您必须订阅,没有办法解决它(除了使用主题,但我没有看到订阅问题 - 它给您带来了什么问题?)。让我稍后再考虑错误管理。顺便更新了代码示例。
    • 有几种处理错误的解决方案。我在这里给了你一个,它使用了materialize 运算符。您还可以使用catch 运算符来捕获错误并将错误代码或对象传递到流中。您可以在此处查看问题以了解类似问题及其解决方案:github.com/Reactive-Extensions/RxJS/issues/829
    • 非常感谢这些想法。 retryWhen 呢?似乎这适合这样的事情。 (抱歉,Rx 新手.. 吸收了很多东西!)
    • retryWhen 可能很难使用,因此请确保您知道如何使用它。在您的情况下,您正在轮询,所以您已经在重试了吗?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-03-30
    • 2015-06-26
    • 2020-07-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多