【发布时间】:2019-08-23 12:58:33
【问题描述】:
我有一个以 1Hz 发射的流。有时,发射的项目之间会有几秒钟的延迟,比如说 10 秒。我想创建一个订阅源的可观察对象,并且每次项目之间的延迟太长(例如 5 秒)时,它都会发出另一种类型的项目。但是,当源再次发出正常值时,它应该发出源。
-O-O-O-O-O----------O-O-O-O---|---> source
-O-O-O-O-O----X-----O-O-O-O---|---> observable
我想,在这种情况下我可以使用timeoutWith(delay,of(X)),但这会取消订阅源,丢失流的其余部分。
当我使用switchMap(O => of(O).timeoutWith(delay, of(x)) 获得一次性Os 流时,它不会超时,因为尚未创建内部可观察对象。
有什么想法吗?
最终解决方案
这就是解决方案,到底是我需要的:
this.sensorChanged
.pipe(
mapTo(SensorEvent.SIGNAL_FOUND),
startWith(PositioningEvent.SIGNAL_UNAVAILABLE),
switchMap(x => concat(of(x), timer(5000).pipe(mapTo(PositioningEvent.SIGNAL_LOST)))),
distinctUntilChanged()
)
缺少的链接是阻止 switchMap 发射的 startWith()。
【问题讨论】:
标签: rxjs