【发布时间】:2022-01-11 13:47:29
【问题描述】:
我正在开发一个 http 缓冲区,该缓冲区可缓冲 HTTP 请求,并在自第一次请求后等待一段时间或达到缓冲区限制后发送它们。所以它必须在以下情况下继续:
- 当通话次数>25时
- 在第一次缓冲区调用后 x 毫秒后,但要等到令牌观察者 === 'valid'。
经过反复试验,我开发了这个:
this.batcherObservable = new Subject<BatchItem>();
this.batcherObservable.pipe(
tap(req => console.log(req.request.url)),
bufferWhen(() => {
// Buffer has opened
const bufferStopt = new Subject()
const limiter = this.batcherObservable.pipe(takeUntil(bufferStopt), bufferCount(26), take(1))
const timer = this.batcherObservable.pipe(takeUntil(bufferStopt), take(1), delay(this.options.batchDelay))
const tokensValid = this.batcherObservable.pipe(takeUntil(bufferStopt), take(1), delayWhen(() => this.tokenStatus.pipe(filter(state => state === 'valid'), take(1))))
return race(limiter, forkJoin([timer, tokensValid])).pipe(tap( () => {
bufferStopt.complete()
}))
}),
filter(requests => requests.length > 0),
tap(req => console.log(req.length)),
delayWhen(() => this.tokenStatus.pipe(filter(state => state === 'valid'), take(1))),
).subscribe(async requests => {
const useBatch = requests.length > 1
let request: Request = !useBatch ? requests[0].request : await this.#renderCallsToBatchCall(<[BatchItem, ...BatchItem[]]>requests)
let response: globalThis.Response
try {
response = await firstValueFrom(this.#sendRequest(request))
} catch (error) {
console.error('error in batch', error)
if (error instanceof globalThis.Response) {
response = await error.json()
} else {
console.error(error)
return new Error('Error when requesting')
}
}
if (!useBatch) {
if(response.ok && (response.status >= 200 && response.status <= 299)) return requests[0].responseObserver.next(response)
else return requests[0].responseObserver.error(response)
}
const batchBody: FlatResponse[]|CWHttpError = await response.json()
// We have to flat out the batchcall and make our own responses
requests.forEach((batchItem, index) => {
if (Array.isArray(batchBody)) {
const {body, headers, status, message} = batchBody[index]
const url = requests[index].request.url
response = new Response(JSON.stringify(body || {}), {headers, statusText: message, status: parseInt(status)})
// URL is private of Response. This is the only way I can set the url
Object.defineProperty(response, 'url', { value: url});
if (response.ok) {
batchItem.responseObserver.next(response)
} else {
batchItem.responseObserver.error(body)
}
batchItem.responseObserver.complete()
} else {
batchItem.responseObserver.error(response)
}
})
});
问题 我可以看到它触发了 25 次调用的请求,但是当我进行 26 次调用时,它不会在第 26 次调用时触发缓冲区。它一直卡在缓冲区中。 -25 和 27+ 完美运行。有人能看出我的实现有什么问题吗?
【问题讨论】:
标签: typescript rxjs