【问题标题】:How to repeat a promise conditionally using rxjs如何使用 rxjs 有条件地重复承诺
【发布时间】:2022-02-16 16:52:58
【问题描述】:

我想重复一个返回 Promise 的 API 调用,有条件地使用 rxjs。

API 方法会收到一个 id,每次调用都会更改它,方法是为其添加一个计数器前缀。将重复调用,直到数据满足某些条件或计数器达到特定数字 X。如何使用 rxjs 完成?

API方法:

fetchData(id):Promise<data>

尝试 1:fetchData(id)

尝试 2:fetchData(id_1)

尝试 3:fetchData(id_2)

【问题讨论】:

    标签: javascript angular promise rxjs rxjs-observables


    【解决方案1】:

    IMO,最好通过 Promises 或 RxJS 来处理轮询,而不要混合它们。我将使用 RxJS 进行说明。

    试试下面的

    1. 使用 RxJS from 函数将 promise 转换为 observable。
    2. 使用 RxJS 函数(如 timerinterval)以固定间隔定期发出值。
    3. 使用像switchMap 这样的高阶映射运算符从外部发射映射到您的API 调用。有关不同类型的高阶映射运算符的简要说明,请参阅 here
    4. 使用两个takeWhile 运算符,分别对应您的每个条件,完成订阅。
    5. 使用filter 运算符仅转发满足条件的排放。
    import { from } from 'rxjs';
    
    fetchData(id: any): Observable<any> {  // <-- return an observable
      return from(apiCall);                // <-- use `from` to convert Promise to Observable
    }
    
    import { timer } from 'rxjs';
    import { filter, switchMap, takeWhile } from 'rxjs/operators';
    
    timer(0, 5000).pipe(                        // <-- poll every 5 seconds
      takeWhile((index: number) => index < 20)  // <-- stop polling after 20 attempts
      switchMap((index: number) => 
        this.someService.apiCall(index+1)       // <-- first emission from `timer` is 0
      ),
      takeWhile(                                // <-- stop polling when a condition from the response is unmet
        (response: any) => response.someValue !== someOtherValue,
        true                                    // <-- emit the response that failed the test
      ),
      filter((response: any) => 
        response.someValue === someOtherValue   // <-- forward only emissions that pass the condition
      )
    ).subscribe({
      next: (response: any) => {
        // handle response
      },
      error: (error: any) => {
        // handle error
      }
    });
    

    编辑:2ndtakeWhile 中的条件与要求相反。我已经调整了条件并包含了inclusive=true 参数。感谢 cmets 中的@Siddhant。

    【讨论】:

    • 第二个takeWhile 不会按照除此之外的评论工作。当response.someValue === someOtherValue 评估为false 而不是true 时,轮询将停止。此外,filter 运算符的使用是多余的。
    • 感谢您的回复@ruth。使用计时器的问题是第一次调用的响应可能需要比时间间隔更长的时间,它会在评估上一次调用的结果之前发送下一个请求。
    • @Siddhant:感谢您对 2ndtakeWhile 的意见。我已经调整了条件并包含了inclusive=true 参数。关于filter,如果没有它,订阅回调将针对 API 调用的所有排放触发。 OP没有提到它是否是必需的行为。所以我不会忽略filter,除非订阅回调也准备好处理未通过特定 OP 条件的 API 响应。
    • @Elham:在这种情况下,您可以尝试使用concatMap 而不是switchMap。请仔细阅读随附的答案以了解其背后的原因。 concatMap 将确保所有 API 调用都被串行触发,而 switchMap 将在 timer 发出时取消当前的内部订阅。
    • @ruth 以前 takeWhilefilter 具有相同的 response.someValue === someOtherValue 条件,因此当条件不满足时 takeWhile 不会发出值,因此 filter 具有相同的条件似乎没有任何用处。如果 OP 有用例,新更新 filter 将发挥作用。
    【解决方案2】:

    您可以使用 concatMap 确保一次只尝试一个调用。 range 给出了最大调用次数,因为如果条件满足/不满足,takeWhile 将提前取消订阅(在范围完成之前)。

    可能看起来像这样:

    // the data met some condition
    function metCondition(data){
      if(data/*something*/){
        return true;
      } else {
        return false
      }
    }
    
    // the counter reach to a specific number X
    const x = 30;
    
    range(0, x).pipe(
      concatMap(v => fetchData(`id_${v === 0 ? '' : v}`)),
      takeWhile(v => !metCondition(v))
    ).subscribe(datum => {
      /* Do something with your data? */
    });
    

    【讨论】:

      【解决方案3】:

      你可以试试 retryWhen:

      let counter=0;
      
      const example = of(1).pipe(
        switchMap(x => of(counter)), // Replace of() with from(fetchData('id_'+counter))
        map(val => {
          if (val < 5) {
            counter++;
            // error will be picked up by retryWhen
            throw val;
          }
          return val;
        }),
        retryWhen(errors =>
          errors.pipe(
            // log error message
            tap(val => console.log(`Response was missing something`)),
          )
        )
      );
      

      这并不理想,因为它需要在外部范围内有一个计数器,但在有更好的解决方案(尤其是没有基于时间的重试)之前,这应该可以工作。

      【讨论】:

      • firstValueFrom 将 observable 转换为 promise。我认为您正在寻找 from 函数。
      • 谢谢,让他们倒退将编辑 ^
      【解决方案4】:

      我知道您已经指定使用 rxjs,但是您还指定 fetchData() 返回 promise 而不是 observable。在这种情况下,我建议使用 asyncawait 而不是 rxjs。

        async retryFetch() {
          let counter = 0;
          while (counter++ < 20 && !this.data) {
            this.data = await this.fetchData(counter);
          }
        }
      

      你可以在条件中放任何你想要的东西。

      即使您的 api 调用返回了一个 observable,我仍然建议将它包装在一个 promise 中并使用这个非常易读的解决方案。

      下面的 stackblitz 用一个 Promise 包装了一个标准的 http.get 并实现了上述功能。承诺会随机返回数据或未定义。

      https://stackblitz.com/edit/angular-ivy-rflclt?file=src/app/app.component.ts

      【讨论】:

        【解决方案5】:
        let count = 0
        const timerId = setTimout( () =>{
           if(count){
              fetchData(`id_${count}`)
           }else{
              fetchData('id')
           }
           count = count + 1
        ,60000}) //runs every 60000 milliseconds
        
        const stopTimer = () =>{ //call this to stop timer
            clearTimeout(timerId);
        }
        

        【讨论】:

        • 绝对不是 RxJS 解决方案。
        • 普通的javascript可以和rxJs一起使用
        • 是的,但是 OP 询问如何使用 RxJS 实现他们的目标
        猜你喜欢
        • 2015-07-08
        • 1970-01-01
        • 2016-05-16
        • 1970-01-01
        • 1970-01-01
        • 2019-11-19
        • 2017-12-20
        • 2013-10-06
        • 1970-01-01
        相关资源
        最近更新 更多