【问题标题】:RXJS buffer with time, count and tokenState带有时间、计数和 tokenState 的 RXJS 缓冲区
【发布时间】:2022-01-11 13:47:29
【问题描述】:

我正在开发一个 http 缓冲区,该缓冲区可缓冲 HTTP 请求,并在自第一次请求后等待一段时间或达到缓冲区限制后发送它们。所以它必须在以下情况下继续:

  • 当通话次数>25时
  • 在第一次缓冲区调用后 x 毫秒后,但要等到令牌观察者 === 'valid'。

经过反复试验,我开发了这个:

this.batcherObservable = new Subject<BatchItem>();
this.batcherObservable.pipe(
  tap(req => console.log(req.request.url)),
  bufferWhen(() => {
    // Buffer has opened
    const bufferStopt = new Subject()
    const limiter = this.batcherObservable.pipe(takeUntil(bufferStopt), bufferCount(26), take(1))
    const timer = this.batcherObservable.pipe(takeUntil(bufferStopt), take(1), delay(this.options.batchDelay))
    const tokensValid = this.batcherObservable.pipe(takeUntil(bufferStopt), take(1), delayWhen(() => this.tokenStatus.pipe(filter(state => state === 'valid'), take(1))))
    return race(limiter, forkJoin([timer, tokensValid])).pipe(tap( () => {
      bufferStopt.complete()
    }))
  }),
  filter(requests => requests.length > 0),
  tap(req => console.log(req.length)),
  delayWhen(() => this.tokenStatus.pipe(filter(state => state === 'valid'), take(1))),
).subscribe(async requests  => {
  const useBatch = requests.length > 1

  let request: Request = !useBatch ? requests[0].request : await this.#renderCallsToBatchCall(<[BatchItem, ...BatchItem[]]>requests)
  let response: globalThis.Response
  try {
    response = await firstValueFrom(this.#sendRequest(request))
  } catch (error) {
    console.error('error in batch', error)
    if (error instanceof globalThis.Response) {
      response = await error.json()
    } else {
      console.error(error)
      return new Error('Error when requesting')
    }
  }

  if (!useBatch) {
    if(response.ok && (response.status >= 200 && response.status <= 299)) return requests[0].responseObserver.next(response)
    else return requests[0].responseObserver.error(response)
  }
  const batchBody: FlatResponse[]|CWHttpError = await response.json()
  
  // We have to flat out the batchcall and make our own responses
  requests.forEach((batchItem, index) => {
    if (Array.isArray(batchBody)) {
      const {body, headers, status, message} = batchBody[index]
      const url = requests[index].request.url
      response = new Response(JSON.stringify(body || {}), {headers, statusText: message, status: parseInt(status)})
      // URL is private of Response. This is the only way I can set the url
      Object.defineProperty(response, 'url', { value: url});
      if (response.ok) {
        batchItem.responseObserver.next(response)
      } else {
        batchItem.responseObserver.error(body)
      }
      batchItem.responseObserver.complete()
    } else {
      batchItem.responseObserver.error(response)
    }
  })
});

问题 我可以看到它触发了 25 次调用的请求,但是当我进行 26 次调用时,它不会在第 26 次调用时触发缓冲区。它一直卡在缓冲区中。 -25 和 27+ 完美运行。有人能看出我的实现有什么问题吗?

【问题讨论】:

    标签: typescript rxjs


    【解决方案1】:

    这将无法在 batcherObservable 的 26 次发射后发射最后一个发射,无论是在其初始化后还是在 bufferWhen 的任何发射后。因此,例如,您可以发出 45 个请求,稍等片刻,然后尝试 26 次快速连续,并且永远不会返回最后一个请求。

    发生的情况是,当您第一次设置流时,bufferWhen 中的逻辑会立即执行,但 bufferWhen 中的逻辑要等到之后才会再次执行在最后一个 bufferWhen 之后来自 batcherObservable 的第一个发射。所以 limitertimertokensValid observable 直到 batcherObservable 发出 NEXT 才会启动。这就是第 27 次发射似乎起作用的原因。

    最简单的方法是让这些 observable 订阅使用 shareReplay 的批处理器版本。但是,这会产生不断重新启动计时器的不利影响。因此,如果您有一段时间没有请求,然后只有一个请求,它将在所需的间隔之前返回(尽管发射之间经过的间隔仍然会更大)。

    this.batcherObservable = new Subject<BatchItem>();
    this.batcherObservableShared = batcherObservable.pipe(shareReplay(1));
    batcherObservable
      .pipe(
        tap((x) => console.log(`buffering ${x}`)),
        bufferWhen(() => {
          // Buffer has opened
          const bufferStopt = new Subject();
          const limiter = batcherObservableShared.pipe(/*... */);
          const timer = batcherObservableShared.pipe(/*... */);
          const tokensValid = batcherObservableShared.pipe(/* .. */);
          return race(limiter, forkJoin([timer, tokensValid])).pipe(
            tap(() => bufferStopt.next())
          );
        }),
        /* ... */
    

    我不喜欢这个解决方案。我从未使用过 bufferWhen,所以也许我的观察可以帮助其他人找到更好的解决方案。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2022-11-11
      • 1970-01-01
      • 1970-01-01
      • 2015-12-04
      • 2013-09-26
      • 1970-01-01
      • 2023-03-07
      • 1970-01-01
      相关资源
      最近更新 更多