【问题标题】:RxJS: How to loop and handle multiple http callRxJS:如何循环和处理多个 http 调用
【发布时间】:2023-03-07 15:03:01
【问题描述】:

我正在使用 NestJS。我想从分页 API 获取所有数据(我不知道总页数)。现在我使用while循环来获取所有数据,直到API返回204 No Content,这是我到目前为止的代码:

async getProduct() {
  let productFinal: ProductItem[] = [];
  let products: ProductItem[] = [];
  let offset = 1;
  let state = COLLECTING_STATE.InProgress;
  let retryCount = 1;
  
  do {
    const path = `product?limit=50&offset=${offset}`;

    products = await this.httpService
      .get(path, { headers, validateStatus: null })
      .pipe(
        concatMap((response) => {
          // if the statusCode is "204", the loop is complete
          if (response.status === 204) {
            state = COLLECTING_STATE.Finish;
          }

          // check if the response is error
          if (response.status < 200 || response.status >= 300) {
            // log error
            Logger.error(
              `[ERROR] Error collecting product on offset: ${offset}. StatusCode: ${
                response.status
              }. Error: ${JSON.stringify(response.data)}. Retrying... (${retryCount})`,
              undefined,
              'Collect Product'
            );

            // increment the retryCount
            retryCount++;

            // return throwError to trigger retry event
            return throwError(`[ERROR] Received status ${response.status} from HTTP call`);
          }

          // return the data if OK
          return of(response.data.item);
        }),
        catchError((err) => {
          if (err?.code) {
            // log error
            Logger.error(
              `Connection error: ${err?.code}. Retrying... (${retryCount})`,
              undefined,
              'Collect Product'
            );

            // increment the retryCount
            retryCount++;
          }
          return throwError(err);
        }),
        // retry three times
        retry(3),
        // if still error, then stop the loop
        catchError((err) => {
          Logger.error(
            `[ERROR] End retrying. Error: ${err?.code ?? err}`,
            undefined,
            'Collect Product'
          );
          state = COLLECTING_STATE.Finish;
          return of(err);
        })
      )
      .toPromise();

    // set retryCount to 1 again
    retryCount = 1;

    // check if products is defined
    if (products?.length > 0) {
      // if so, push the product to final variable
      productFinal = union(products, productFinal);
    }

    // increment the offset
    offset++;

    // and loop while the state is not finish
  } while ((state as COLLECTING_STATE) !== COLLECTING_STATE.Finish);

  return productFinal;
}

端点product?limit=50&amp;offset=${offset} 来自第三方服务,它没有一个端点来获取所有数据,所以这是唯一的方法,它的最大limitoffset 有50 个,并且它没有关于响应的nextPagetotalPage 信息,所以我必须使offset 变量并在上一个请求完成后递增它。

如何用 RxJS 操作符替换 while 循环?是否可以将其优化为一次发出多个请求(可能是四个或五个),从而减少获取所有数据的时间?

【问题讨论】:

  • 为什么在不需要的时候使用分页?只需从端点一次抓取所有内容。
  • 分页来自API,不是我的。 API 没有提供单个端点来获取所有这些数据
  • 然后给它一个上限值(比如max int值)。
  • 我想你没有明白我的意思,我上面的代码正在工作,我问我是否可以用 rxjs 运算符替换 while 循环。我想知道我是否可以优化它以一次发出多个请求
  • 只进行一次调用而不是 100、1k 或 10k 调用是优化。您的循环正在浪费您的资源 - 循环是否并行运行并不重要。

标签: rxjs nestjs


【解决方案1】:

基于来自RxJS Observable Pagination 的回答,但每次发出请求时增加offset

const { of, timer, defer, EMPTY, from, concat } = rxjs; // = require("rxjs")
const { map, tap, mapTo, mergeMap, take } = rxjs.operators; // = require("rxjs/operators")

// simulate network request
function fetchPage({ limit, offset }) {
  // 204 resposne
  if (offset > 20) {
    return of({ status: 204, items: null });
  }

  // regular data response
  return timer(100).pipe(
    tap(() =>
      console.log(`-> fetched elements from ${offset} to ${offset+limit}`)
    ),
    mapTo({
      status: 200,
      items: Array.from({ length: limit }).map((_, i) => offset + i)
    })
  );
}

const limit = 10;
function getItems(offset = 0) {
  return defer(() => fetchPage({ limit, offset })).pipe(
    mergeMap(({ status, items }) => {
      if (status === 204) {
        return EMPTY;
      }
      const items$ = from(items);
      const next$ = getItems(offset + limit);
      return concat(items$, next$);
    })
  );
}

// process only first 100 items, without fetching all of the data
getItems()
  .pipe(take(100))
  .subscribe({
    next: console.log,
    error: console.error,
    complete: () => console.log("complete")
  });
&lt;script src="https://unpkg.com/rxjs@6.6.2/bundles/rxjs.umd.min.js"&gt;&lt;/script&gt;

关于发出并行请求的可能优化 - 我认为它不会很好地工作。相反,您可以在项目加载后逐步显示数据。或者按照 cmets 中的建议更改 API。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-05-26
    • 2018-12-27
    • 1970-01-01
    • 2018-01-07
    • 1970-01-01
    • 2021-06-27
    • 2015-07-27
    • 2013-12-10
    相关资源
    最近更新 更多