【问题标题】:Rxjs Retry with Delay function带有延迟功能的 Rxjs 重试
【发布时间】:2017-12-12 05:52:56
【问题描述】:

我正在尝试将 retrydelay 函数一起使用,我希望函数会在 1000 毫秒延迟后调用,但它没有,这可能是什么错误? 查看控制台输出,时间是 16:22:48。

我预计那里是 16:22:48、16:22:59 ...

canCreate: boolean;
getSomeFunction(): Observable<boolean> {
        return new Observable<boolean>(
            observer => {
                const canCreate = null; // this is just null for now, will some value later
                if (canCreate == null) {
                    observer.error('error');
                } else {
                    observer.next(true);
                }
                observer.complete();
            }
        )
    }


this.getSomeFunction()
      .do((value) => {
        this.cCreate = value;
      }, (error) => {
         console.log(error + new Date().toTimeString());
      })
      .delay(1000)
      .retry(10)
      .subscribe(
        value => this.cCreate = value,
        error => {
          this.cCreate = false;
        },
        () => {}
      );
  }

控制台结果是:

【问题讨论】:

    标签: angular typescript rxjs


    【解决方案1】:

    delay() 用于在 observable 发出的事件之间引入延迟。但是 observable 永远不会发出任何事件。它只是立即出错。

    您要查找的是retryWhen(),它允许决定在多长时间后重试:

    RxJS 5:

      .retryWhen(errors => errors.delay(1000).take(10))
    

    RxJS 6:

    import { retryWhen, delay, take } from 'rxjs/operators'
    someFunction().pipe(
      // ...
      retryWhen(errors => errors.pipe(delay(1000), take(10)))
    )
    

    这将在 10 次尝试后完成整个 observable。如果你想在 10 次尝试后使整个 observable 出错,retryWhen 回调返回的 observable 必须抛出:

    RxJS 5:

      .retryWhen(errors => errors.delay(1000).take(10).concat(Observable.throw()))
    

    RxJS 6:

    import { retryWhen, delay, take, concatMap, throwError } from 'rxjs/operators'
    someFunction().pipe(
      // ...
      retryWhen(errors => errors.pipe(delay(1000), take(10), concatMap(throwError)))
    )
    

    【讨论】:

    • errors的类型是observable&lt;any&gt;,它没有complete方法
    • 我误解了文档。现在应该可以了。见plnkr.co/edit/HTpcxkH9MqKqAsVep2na?p=preview
    • 这里是较新 Angular/RxJS 版本的完整代码:pipe( retryWhen(error =&gt; error.pipe(delay(1000), take(3), concat(Observable.throw(error)))), catchError(myErrorHandler.bind(this)) )
    • 401错误要停止重试怎么办?
    • 重投不正常。
    【解决方案2】:

    补充@JB Nizet 的答案。如果你是在 rxjs 5+ 中使用 lettable 操作符编写的,那么将其结构化为

    retryWhen(errors =&gt; errors.pipe(delay(1000), take(5)))

    【讨论】:

    • 知道.concat(Observable.throw())) 的等价物是什么吗?
    • @EdBordin 如果我猜我会说....pipe(delay(1000), take(5), concat(Observable.throw()))
    • 谢谢!非常接近:pipe(delay(1000), take(5), concat(Observable.throw('error message seems to be required')))
    • @EdBordin @DavidTheProgrammer 任何想法如何使用 RxJS 6 和已弃用的“实例 concat()”来正确处理?我得到了retryWhen(errors =&gt; concat(errors.pipe(delay(750), take(2)), throwError(errors)))),但它不会解开原始错误。
    • 无论如何我也可以获得错误索引吗?我想增加后续错误的延迟。
    【解决方案3】:

    所有这些都是 RxJS 6+


    TL;DR

    您可以使用此包中经过全面测试的运算符,或向下滚动查看源代码 :)

    npm i rxjs-boost
    
    import { retryWithDelay } from 'rxjs-boost/operators';
    
    obs$.pipe(
      // will retry 4 times with a 1s delay before each try:
      retryWithDelay(1000, 4)
    );
    

    标准

    由于大多数(或者可能没有)其他答案不符合我的所有标准,我将在下面列出我的解决方案。目标:

    • 如果没有抛出错误,则定期发出并完成。 ✅
    • 如果抛出错误,重试x 次。 ✅
    • 在每次重试之前有y 的延迟。 ✅
    • 返回最后发出的错误。(很多其他答案都在努力解决这个问题。)✅
    • 使用strict: true 正确输入 - 但这很难搞砸。 ✅

    解决方案

    与其他答案一样,我们将使用retryWhen 运算符来捕获错误。要跟踪重复次数可以使用scan 运算符。为了限制重复次数,我们只需在 map 运算符中抛出一个错误。

    原始来源使用throwIf,但在这种情况下,我们可以简单地使用来自rxjs-boostretryWithDelay

    最后我们将使用delay 运算符来添加不同执行之间的延迟:

    import { MonoTypeOperatorFunction } from 'rxjs';
    import { delay as delayOperator, map, retryWhen, scan } from 'rxjs/operators';
    
    export function retryWithDelay<T>(
      delay: number,
      count = 1
    ): MonoTypeOperatorFunction<T> {
      return (input) =>
        input.pipe(
          retryWhen((errors) =>
            errors.pipe(
              scan((acc, error) => ({ count: acc.count + 1, error }), {
                count: 0,
                error: undefined as any,
              }),
              map((current) => {
                if (current.count > count) {
                  throw current.error;
                }
                return current;
              }),
              delayOperator(delay)
            )
          )
        );
    }
    

    来源

    【讨论】:

    • 'rxjs-boost/operators' 导入它不起作用。我必须从'rxjs-boost/lib/operators' 导入它。
    【解决方案4】:

    我得出这个结论,是为了用http管道中的其他操作重试

    import {delay as _delay, map, retryWhen} from 'rxjs/operators';
    
    export const delayedRetry = (delay, retries = 1) => retryWhen(result => {
        let _retries = 0;
        return result.pipe(
          _delay(delay),
          map(error => {
            if (_retries++ === retries) {
              throw error;
            }
            return error;
          }),
        );
      },
    );
    

    用法

        http.pipe(
          delayedRetry(1500, 2),
          catchError((err) => {
            this.toasterService.error($localize`:@@not-saved:Could not save`);
            return of(false);
          }),
          finalize(() => this.sending = false),
        ).subscribe((success: boolean) => {
            if (success === true) {
               this.toasterService.success($localize`:@@saved:Saved`);
            }
          }
        });
    

    【讨论】:

    • 我有一些类型错误,但是像这样调整它,修复它:export function delayedRetry&lt;T&gt;(delay: number, retries = 1): MonoTypeOperatorFunction&lt;T&gt; { return (input) =&gt; input.pipe( retryWhen((errors) =&gt; { let retriesInner = 0; return errors.pipe( delayOperator(delay), map((error) =&gt; { if (retriesInner++ === retries) { throw error; } return error; }), ); }), ); }
    【解决方案5】:

    对于ngrx5+,我们可以创建运算符:

    
    function retryRequest(constructor: () => Observable, count: number, delayTime: number) {
      let index = 0;
      return of(1) // we need to repeat not the result of constructor(), but the call of constructor() itself
        .pipe(
          switchMap(constructor),
          retryWhen(errors => errors.pipe(
            delay(delayTime),
            mergeMap(error => {
              if (++index > count) {
                return throwError(error);
              }
              return of(error);
            })
          ))
        );
    }
    

    【讨论】:

      【解决方案6】:

      我最近遇到了这个问题,发现可以改进接受的解决方案。

      Observable.pipe(
           retryWhen(errors => errors.pipe(
            delay(1000),
            take(10))),
          first(v => true),
          timeout(10000))
      

      它本质上所做的是如上所述重试,但这会立即完成,而不会使用“第一个”运算符添加任何(错误)值。

      如果在超时时间内找不到值,则会引发错误。

      【讨论】:

        【解决方案7】:

        适用于 rxjs 版本 6.3.3

        https://stackblitz.com/edit/http-basics-8swzpy

        打开控制台并查看重试次数

        示例代码

        import { map, catchError, retryWhen, take, delay, concat } from 'rxjs/operators';
        import { throwError } from 'rxjs';
        
        
        export class ApiEXT {
        
            static get apiURL(): string { return 'http://localhost:57886/api'; };
            static httpCLIENT: HttpClient;
        
         static POST(postOBJ: any, retryCOUNT: number = 0, retryINTERVAL: number = 1000) {
                return this.httpCLIENT
                    .post(this.apiURL, JSON.stringify(postOBJ))
                    .pipe(
                        map(this.handleSUCCESS),
                        retryWhen(errors => errors.pipe(delay(retryINTERVAL), take(retryCOUNT), concat(throwError("Giving up Retry.!")))),
                        catchError(this.handleERROR));
            }
        
        
          private static handleSUCCESS(json_response: string): any {
                //TODO: cast_and_return    
                return JSON.parse(json_response);
        
            }
        
         private static handleERROR(error: Response) {
                let errorMSG: string;
                switch (error.status) {
                    case -1: errorMSG = "(" + error.status + "/" + error.statusText + ")" + " Server Not Reachable.!"; break;
                    default: errorMSG = "(" + error.status + "/" + error.statusText + ")" + " Unknown Error while connecting with server.!"; break;
                }
                console.error(errorMSG);
                return throwError(errorMSG);
            }
        
        }
        

        【讨论】:

          【解决方案8】:

          这可能对你有帮助

          let values$ = Rx.Observable.interval(1000).take(5);
          let errorFixed = false;
          
          values$
          .map((val) => {
             if(errorFixed) { return val; }
             else if( val > 0 && val % 2 === 0) {
                errorFixed = true;
                throw { error : 'error' };
          
             } else {
                return val;
             }
          })
          .retryWhen((err) => {
              console.log('retrying again');
              return err.delay(1000).take(3); // 3 times
          })
          .subscribe((val) => { console.log('value',val) });
          

          【讨论】:

          • 这是无限次调用,我只需要重试10次。
          【解决方案9】:

          RxJS 提供了retry 操作符,当出现错误时,它会为给定的计数重新订阅 Observable。在抛出错误之前 Observable 被retry 运算符重新订阅给定的计数,如果仍然存在错误,则抛出错误。 retry 对于多次点击 URL 很有用。有可能由于网络带宽的原因,URL 一次没有返回成功的数据,当它重新连接时,可能会成功返回数据。如果重新绑定后 Observable 中仍然存在错误,则可以使用 catchError 返回带有用户定义的默认数据的 Observable。

          getBook(id: number): Observable<Book> {
            return this.http.get<Book>(this.bookUrl + "/" + id).pipe(
               retry(3),
               catchError(err => {
                console.log(err);
                return of(null);
               })
            );
          }
          

          【讨论】:

          • 如何使用延迟重试,在我的情况下只有重试有效
          【解决方案10】:

          我使用retryWhenObservable.Interval 提出了以下解决方案,但在此解决方案中,订阅的error 函数从不调用,

          this.branchService.getCanCreate()
            .do((value) => {
              this.cCreate = value;
            }, (error) => {
              console.log('do', error + new Date().toTimeString());
            })
            .retryWhen(errors => {
              return Observable.interval(1000).take(3).concat(Observable.throw('error')));
            })
            .subscribe(
              value => {
                this.cCreate = !!value
                console.log('success', new Date().toTimeString());
              },
              error => {
                console.log('subscribe', error + new Date().toTimeString());
                this.cCreate = false;
              },
              () => {
                console.log('finally', new Date().toTimeString());
              }
            );
          

          【讨论】:

            猜你喜欢
            • 2021-01-13
            • 2012-01-10
            • 2012-03-01
            • 2017-07-18
            • 1970-01-01
            • 2011-05-02
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            相关资源
            最近更新 更多