【问题标题】:How to switch from an interval based solution to a delay based using RxJS如何使用 RxJS 从基于间隔的解决方案切换到基于延迟的解决方案
【发布时间】:2021-04-29 15:31:16
【问题描述】:

我是 typescript 和 RxJs 的新手,并且被分配了一个错误,我们的搜索经常会因 HTTP 404 状态而失败。 尤其是当用户从远程运行应用程序时。

搜索调用是对一个后端进行的,该后端会衍生出长时间运行的搜索,这些搜索必须在完成之前进行轮询。这是通过在标题中提供 X-TaskId 来完成的。

我在下面发现了可疑的方法,我怀疑该方法的作用是每秒轮询一次,而不管搜索调用需要多长时间。 IE。轮询已完成但不存在的搜索 taskId 存在风险,这会给我们一个 404。

返回HTTP状态

  • 200:当搜索完成时,响应正文将包含结果。
  • 202:搜索正在运行,并且 X-TaskId 标头字段指示其 id。
  • 404:给定的搜索taskId不存在。
  performSearch(pollingInterval: number, searchCriterias: string[], search: MalSokningDto[], actor: string): void {
    this.scrollAndFocus(searchCriterias[0]);
    this.searchSubscription = this.searchService.search(search, null).subscribe(
      (initialResponse: HttpResponse<ResponseDto>) => {
        if (initialResponse.status === 202) {
          this.taskId = initialResponse.headers.get('X-TaskId');
          interval(pollingInterval)
            .pipe(
              switchMap(() => this.searchService.search(search, this.taskId)),
              filter((response: HttpResponse<ResponseDto>) => response.status === 200),
              take(1),
              timeout(30000)
            )
            .subscribe(
              (response: HttpResponse<ResponseDto>) => {
                this.loading = false;
                this.showSearchResult(searchCriterias, response.body, actor, null);
              },
              (error: Error) => {
                this.loading = false;
                this.showSearchResult(searchCriterias, null, actor, error);
              }
            );
        }
      },
      (error: Error) => {
        this.loading = false;
        this.showSearchResult(searchCriterias, null, actor, error);
      }
    );
  }

我对这段代码有很多疑问。

  1. 我如何(在 RxJS 异步中)样式切换以将其重构为添加等待上一个调用以及服务器调用之间的延迟。
  2. 存在冗余,因为我们对 searchService.search 进行了两次调用,对 showSearchResult 进行了 3 次调用。
  3. 我们存储了一个全局搜索订阅(我们稍后可能会取消它),但其他所有已创建的内容呢? (我们的搜索可能需要 10-30 秒,因此会有 20-30 次调用)我想我们的取消搜索也不总是有效。

【问题讨论】:

  • 加载解决方案似乎不太合适。
  • 那么,200 和 404 是同一个用例,对吧?这是否意味着搜索结束了?
  • Yes 200 是一个成功的搜索结果。 404表示你发送了一个taskId不存在的请求。

标签: typescript rxjs


【解决方案1】:
  import { Observable, Subject, timer } from 'rxjs';
  import { switchMap, takeUntil } from 'rxjs/operators';

  private refreshInterval$: Observable<any> = timer(0, 30000)
  private killTrigger: Subject<void> = new Subject();

  performSearch(pollingInterval: number, searchCriterias: string[], search: MalSokningDto[], actor: string): void {

  this.scrollAndFocus(searchCriterias[0]);
  this.searchSubscription =
    this.searchService.search(search, null)
    .subscribe(
        (initialResponse: HttpResponse < ResponseDto > ) => {
            this.refreshInterval$
                .pipe(
                    takeUntil(this.killTrigger),
                    switchMap(() => {
                            if (initialResponse.status === 202) {
                                this.taskId = initialResponse.headers.get('X-TaskId');
                                return this.searchService.search(search, this.taskId))
                        }
                    }),
        )
        .subscribe(
            (response: HttpResponse < ResponseDto > ) => {
                this.killTrigger.next();
                this.loading = false;
                this.showSearchResult(searchCriterias, response.body, actor, null);
            },
            (error: Error) => {
                this.loading = false;
                this.showSearchResult(searchCriterias, null, actor, error);
            }
        )
    },
    (error: Error) => {
        this.killTrigger.next();
        this.loading = false;
        this.showSearchResult(searchCriterias, null, actor, error);
    })

我相信这会回答您的问题。 在这里,我所做的是将一个名为 refreshInterval 的计时器可观察对象设为 30000。 和 killTrigger 主题。

现在您的函数调用 this.searchService.search(search, this.taskId)) 将每 30000 次调用一次。直到您通过 killtrigger.next()

希望这能解决您的问题。 我也在学习 rxjs,所以如果有人发现任何更简单或优化的方法,请随时编辑它。

【讨论】:

  • 好吧,我明天试试,原来的代码每秒轮询一次,30秒后放弃。我猜将 30000 更改为 pollingInterval 将解决轮询延迟。如何在 30 秒后添加放弃!?
  • 是的,它将每 30 秒运行一次。退出代码已经存在。成功响应中的 killtrigger.next() 将打破循环。试着告诉我
  • 我还没有让这个工作。我需要一个轮询解决方案,在每次轮询之间插入延迟。我尝试了上面的方法,但意识到计时器可以在上一个响应到来之前触发。
【解决方案2】:

这个解决方案是我在轮询之间延迟轮询时提出的。


/**
 * Fetch search result. At first call backend will create a LongRunningTask plus return
 * a corresponding taskId. We will then poll with given taskId until we have a result.
 */
performSearchg(searchCriterias: string[], search: MalSokningDto[], actor: string): void {
    let milllisBetweenPolls: number = 1000;

    // -- Cancel any previous ongoing search. (taskId => null). --
    this.cancelSearch();

    let stopPoll: Subject<void> = new Subject();

    this.searchService.search(search, this.taskId).pipe(
      expand(_ => timer(milllisBetweenPolls).pipe(
        mergeMap(_ => this.searchService.search(search, this.taskId))
        )
      ),
      takeUntil(stopPoll)
    ).subscribe(
      (response: HttpResponse<ResponseDto>) => {
        switch (response.status) {
          case 200: {
            stopPoll.next();
            this.loading = false;
            this.showSearchResult(searchCriterias, response.body, actor, null);
            break;
          }
          case 202: {
            this.taskId = response.headers.get('X-TaskId');
            break;
          }
          default: {
            stopPoll.next();
            this.loading = false;
            this.showSearchResult(searchCriterias, null, actor, new Error(Constants.SEARCH_ERROR));
            break;
          }
        }
      },
      (error: Error) => {
        this.loading = false;
        slutaPolla.next();
        this.showSearchResult(searchCriterias, null, aktor, error);
      }
    );
  }




【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-12-10
    相关资源
    最近更新 更多