【问题标题】:How to create an observable based on a while loop?如何基于 while 循环创建可观察对象?
【发布时间】:2019-05-22 18:41:11
【问题描述】:

我正在尝试创建一个 Observable,它不断地查询外部服务以获取更新,如果有新服务,则发出更新:

this._loop = new Rx.Observable<TDL.Result>(subscriber =>
{
    let shouldLoop = true;

    while (shouldLoop)
    {
        if (!this._client)
            throw new Error("This client is not initialised.");

        const update = this._lib.receiveSync(this._client, 5);

        if (!update)
            continue;

        if (update._ === "error")
            this.emit("error", update);
        else
            this.emit("update", update);

        subscriber.next(update);
    }

    // never gets here b/c of while loop, so subscribing to this Observable
    // causes everything to block

    // cancellation logic
    return () =>
    {
        shouldLoop = false;
        this._loop = null;
    };
}).pipe(RxOp.publish()) as Rx.ConnectableObservable<TDL.Result>;

this._loopSubscription = this._loop.connect();

但是,订阅功能是阻塞的,这意味着当我调用connect() 时我的代码会停止。我如何重写它以使订阅功能非阻塞?

【问题讨论】:

  • subscribe 仅在您的 new Observable(o =&gt; ...) 阻塞时阻塞。但是,看起来您可以循环使用 setTimeout 而不是 while

标签: typescript rxjs observable rxjs6


【解决方案1】:

感谢@martin,解决方案非常明显。我不知道为什么我没有想到这个?

this._loop = new Rx.Observable<TDL.Result>(subscriber =>
{
    let shouldLoop = true;

    process.nextTick(() =>
    {
        while (shouldLoop)
        {
            if (!this._client)
                throw new Error("This client is not initialised.");

            const update = this._lib.receiveSync(this._client, 5);

            if (!update)
                continue;

            if (update._ === "error")
                this.emit("error", update);
            else
                this.emit("update", update);

            subscriber.next(update);
        }
    });

    // cancellation logic
    return () =>
    {
        shouldLoop = false;
        this._loop = null;
    };
}).pipe(RxOp.publish()) as Rx.ConnectableObservable<TDL.Result>;

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-10-06
    • 1970-01-01
    • 1970-01-01
    • 2017-07-13
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多