【发布时间】:2018-07-25 10:27:01
【问题描述】:
我正在使用 RxJS 6 创建pipeable operators,但不清楚当操作是异步时如何complete()观察者。
对于同步操作,逻辑很简单。在下面的示例中,来自源 Observable 的所有值都将传递给 observer.next(),然后调用 observer.complete()。
const syncOp = () => (source) =>
new rxjs.Observable(observer => {
return source.subscribe({
next: (x) => observer.next(x),
error: (e) => observer.error(err),
complete: () => observer.complete()
})
});
rxjs.from([1, 2, 3]).pipe(syncOp()).subscribe(x => console.log(x));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js">
</script>
但是,对于 异步 操作,我有点不知所措。在下面的示例中,异步操作由对setTimeout() 的调用表示。显然,observer.complete() 将在任何值被传递给observer.next() 之前被调用。
const asyncOp = () => (source) =>
new rxjs.Observable(observer => {
return source.subscribe({
next: (x) => setTimeout(() => observer.next(x), 100),
error: (e) => observer.error(err),
complete: () => observer.complete()
})
});
rxjs.from([1, 2, 3]).pipe(asyncOp()).subscribe(x => console.log(x));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js">
</script>
所以问题是:什么是惯用的 RxJS 方法来实现对 observer.complete() 的调用仅在所有值异步传递给 observer.next() 之后进行?我应该手动跟踪待处理的呼叫还是有更“被动”的解决方案?
(请注意,上面的示例是对我的实际代码的简化,并且对 setTimeout() 的调用旨在表示“任何异步操作”。我正在寻找一种处理异步的通用方法管道操作符中的操作,而不是关于如何处理 RxJS 中的延迟或超时的建议。)
【问题讨论】:
-
我不明白问题出在哪里。如果你想在
nexts 之后调用complete(),然后将complete调用放入next处理程序中,并添加一些你需要的额外逻辑。 -
@martin 是的,这就是我在问题“我应该手动跟踪待处理呼叫吗?”时所指的方法。。如果没有其他惯用的替代方法,那我最终会这样做。
-
最惯用、最通用和可组合的方式是@Picci 在他的回答中提出的 - 不要从头开始创建新的 observable 并订阅它。相反,只需使用内置运算符,让他们为您完成所有订阅管理。如果由于某种原因这对您不起作用,请提供有关您的特定问题的更多详细信息。
标签: javascript asynchronous rxjs rxjs-pipeable-operators