【发布时间】:2021-09-14 03:24:38
【问题描述】:
我有一个通过fromEventPattern 传递的事件流,如下所示:
fromEventPattern<IPsEvent>(addEventHandler).subscribe(ps$);
由于业务怪癖,我预计有时会引发异常,此时我想将事件排队并在错误状态解决后重新触发。
我一直在尝试Pausable buffer with RxJS 的解决方案,但无济于事。我认为这是因为他们能够通过单独的 observable 切换,而这有点要求在中途暂停自己。在链接的示例中,我有blockingCallsAllowed$ 而不是autoSave$。这是我最近的尝试:
const source$ = new Subject<IPsEvent>();
const blockingCallsAllowed$ = new BehaviorSubject(true);
const on$ = blockingCallsAllowed$.pipe(filter((v) => v));
const off$ = blockingCallsAllowed$.pipe(filter((v) => !v));
source$
.pipe(
map(() => {
try {
// line will throw exception at certain times
myFunction();
return true;
} catch (e) {
const i = setInterval(() => {
try {
myFunction();
console.log('good again');
blockingCallsAllowed$.next(true);
clearInterval(i);
} catch (er) {
// still in flux
}
}, 50);
return false;
}
}),
)
.subscribe(blockingCallsAllowed$);
const output$ = merge(
source$.pipe(bufferToggle(off$, () => on$)),
source$.pipe(windowToggle(on$, () => off$)),
).pipe(concatMap(from));
output$.subscribe((evt) => {
console.log('After buffers', evt);
});
// Add events from the Ps API to the event stream
fromEventPattern(addEventHandler).subscribe(source$);
在第一个异常之前一切正常,然后它永远不会输出它缓冲的内容,即使它在console.log 中再次触发一切都很好。
我认为在同一执行中依赖 source$.pipe 以及稍后使用 .next 运行的间隔存在一些时间问题。尽管在这段代码的许多不同排列之后仍然无法确定它。
【问题讨论】:
标签: rxjs