【问题标题】:Retrying a polling service with n seconds delay延迟 n 秒重试轮询服务
【发布时间】:2018-08-15 19:09:01
【问题描述】:
private pollSubscriptions: Subscription;
private defaultPollTime: number = 2000;
constructor(
    private http: HttpClient,
) {
    this.pollSubscriptions = new Subscription();
}

pollRequest<T>(
    url: string,
    updateStatus: any,
    pollWhileCondition: Function,
    onPollingSuccessCallback?: Function,
    timer = this.defaultPollTime
) {
    this.pollSubscriptions.add(timer(0, 2000).pipe(
        switchMap(() => this.http.get<T>(url).pipe(
            catchError((error: any) => empty()))),
        tap(updateStatus),
        takeWhile(data => pollWhileCondition(data)))
        .subscribe());
}

ngOnDestroy(): void {
    this.pollSubscriptions.unsubscribe();
}

我可以同时轮询多个网址。但是如何增强当前的功能以满足以下要求:

  1. 如果被轮询的 url 失败,那么我们如何在 3(n) 秒的延迟下重试轮询 url 3 次?
  2. 我们如何在被轮询的 url 上添加不同的运算符?

仍然没有解决方案

提前致谢

【问题讨论】:

  • 仍然没有解决方案谢谢

标签: javascript angular rxjs reactive-programming rxjs6


【解决方案1】:

希望这会有所帮助..

有几点:

  1. 如果您希望所有投票同时进行,您可能需要使用共享计时器进行投票。所以我在下面添加了pollWhen 属性。
  2. 您正在寻找retryWhen,这是一个很难理解的运算符,但基本上它的工作原理是这样的:当您订阅时,它会调用您传递的函数,并带有可观察到的潜在错误,当发出错误时,您应该将其擦洗为“下一个”值以重试,立即完成(例如空)以安静地完成,或错误(例如 throwError)以错误地杀死可观察对象。
class Component() {
  /** ticks every 10 seconds */
  pollWhen = timer(0, 10000)
    .pipe(share());

  private pollSubscriptions: Subscription;

  constructor(
      private http: HttpClient,
  ) {
      this.pollSubscriptions = new Subscription();
  }

  pollRequest<T>(
      url: string,
      updateStatus: any,
      pollWhileCondition: Function,
      onPollingSuccessCallback?: Function,
      timer = this.defaultPollTime
  ) {
      this.pollSubscriptions.add(this.pollWhen.pipe(
      switchMap(() =>
        this.http.get<T>(url).pipe(
          // Setup retries
          retryWhen(
            errors => errors.switchMap(
              // if more than 3 retries,
              // stop retrying quietly
              (_, i) => i < 3
                ? timer(1000)
                : EMPTY
            )
          )
        )
      ),
      tap(updateStatus),
      takeWhile(pollWhileCondition)
    ).subscribe());
  }

  ngOnDestroy(): void {
    this.pollSubscriptions.unsubscribe();
  }
}

【讨论】:

    【解决方案2】:

    要回答您的第一个问题,您可以使用 retryWhen 运算符并替换 catch。 对于 2 个问题,方法需要一些重写,您可以将 pollRequest 更改为 subject() 以存储并将不同的 url 发送到流进行处理。

        var pollUrl = new rxjs.Subject()
    const updateStatus = () => true
    const pollWhileCondition = () => true
    const http = url => {
      console.log('http call...',url)
      return rxjs.timer(1000).pipe(
      rxjs.operators.tap(()=>{
        throw "http call error"
      })
      )
    }
    const distinctUrl = pollUrl.pipe(rxjs.operators.distinct())
    
    distinctUrl.pipe(
      rxjs.operators.mergeMap(url =>  {
        return rxjs.timer(0, 2000).pipe(rxjs.operators.map(() => url))
      }),
      rxjs.operators.tap(()=>console.log('xxx')),
      rxjs.operators.mergeMap(url => http(url).pipe(
      rxjs.operators.retry(3),
       )),
        rxjs.operators.catchError(()=>rxjs.empty()),
        rxjs.operators.repeat()
    ).subscribe(()=>{},err=>{
    console.warn(err)
    },()=>console.log('comple'))
    
    pollUrl.next('http://google.com')
    setTimeout(()=> pollUrl.next('http://twitter.com') ,7000)
    

    http://jsfiddle.net/cy0nbs3x/1535/

    【讨论】:

    • 太棒了!!我们如何通过上述实现将次数限制为 3。能否请您详细说明点解决方案@Fan Cheung
    • 更新了,但是大括号和括号很乱,希望你明白
    • 能否详细说明问题的第二部分?
    • 哪一部分让您感到困惑?也请看看learnrxjs.io/operators/error_handling/retrywhen.html
    • 感谢范!!!重试什么时候工作!!!我在问我们如何在被轮询的 url 上添加不同的运算符?
    【解决方案3】:

    不是完整的解决方案,但在这里我可以实现重试 3 次,延迟为 3 秒。我仍在寻找如何使活动的投票网址与众不同。非常感谢任何帮助。

    import { timer as observableTimer, Subscription, interval, of, concat, Observable } from 'rxjs';
    import { takeWhile, tap, take, switchMap, repeat, retryWhen, scan, mapTo, expand, exhaustMap } from 'rxjs/operators';
    
    @Injectable()
    export class ReportPollingService {
    
        private pollSubscriptions: Subscription;
        constructor(private http: HttpClient){}
    
        pollRequest<T>(url: string, updateStatus: any, pollWhileCondition: Function){
         if (this.pollSubscriptions.closed) {
           this.pollSubscriptions = new Subscription();// re-open polling
         }
         const request$ = this.http.get<T>(url);
         const firstRequest$ = request$;
         const polling$ = interval(options.interval).pipe(
             take(1),
             exhaustMap(() => request$),
             repeat()
         );
         this.pollSubscriptions.add(concat(firstRequest$, polling$).pipe(
             retryWhen(errors$ => {
                 return errors$.pipe(
                      scan(
                          ({ errorCount, error }, err) => {
                              return { errorCount: errorCount + 1, error: err };
                          },
                          { errorCount: 0, error: null }
                      ),
                      switchMap(({ errorCount, error }) => {
                          if (errorCount >= 3) {
                               throw error;
                          }
                          return observableTimer(3000, null);
                      })
                  );
             }),
         ).pipe(tap(updateStatus), takeWhile(data => pollWhileCondition(data))).subscribe());
       }
    
       stopPolling(): void {
           this.pollSubscriptions.unsubscribe();
       }                  
    

    【讨论】:

      猜你喜欢
      • 2023-04-10
      • 2020-04-29
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-08-09
      • 2020-06-17
      相关资源
      最近更新 更多