【发布时间】:2019-05-22 18:41:11
【问题描述】:
我正在尝试创建一个 Observable,它不断地查询外部服务以获取更新,如果有新服务,则发出更新:
this._loop = new Rx.Observable<TDL.Result>(subscriber =>
{
let shouldLoop = true;
while (shouldLoop)
{
if (!this._client)
throw new Error("This client is not initialised.");
const update = this._lib.receiveSync(this._client, 5);
if (!update)
continue;
if (update._ === "error")
this.emit("error", update);
else
this.emit("update", update);
subscriber.next(update);
}
// never gets here b/c of while loop, so subscribing to this Observable
// causes everything to block
// cancellation logic
return () =>
{
shouldLoop = false;
this._loop = null;
};
}).pipe(RxOp.publish()) as Rx.ConnectableObservable<TDL.Result>;
this._loopSubscription = this._loop.connect();
但是,订阅功能是阻塞的,这意味着当我调用connect() 时我的代码会停止。我如何重写它以使订阅功能非阻塞?
【问题讨论】:
-
subscribe仅在您的new Observable(o => ...)阻塞时阻塞。但是,看起来您可以循环使用setTimeout而不是while
标签: typescript rxjs observable rxjs6