【问题标题】:publishing observable to make it *hot*发布 observable 使其*热*
【发布时间】:2016-12-27 00:13:03
【问题描述】:

我正在尝试创建一个不需要订阅的热门 observable。这是一个库,我想让用户能够调用某些方法并避免调用 subscribe() 来触发 observables。最初我有这个:

const q = new Queue();

q.add('foo bar baz').subscribe();   // <<< need to call subscribe


Queue.prototype.add = Queue.prototype.enqueue = function (lines) {

    lines = _.flattenDeep([lines]);

    var lockAcquired = false;

    return this.init()
        .flatMap(() => {
            return acquireLock(this)
        })
        .flatMap(() => {
            lockAcquired = true;
            return appendFile(this, lines)
        })
        .flatMap(() => releaseLock(this))
        .catch(err => {
            if (lockAcquired) {
                return releaseLock(this);
            }
            else {
                return makeGenericObservable();
            }
        })


};

为了让 observable 变热,我想我可以这样做:

const q = new Queue();

q.add('foo bar baz');  // <<< don't call subscribe

Queue.prototype.add = Queue.prototype.enqueue = function (lines) {

    lines = _.flattenDeep([lines]);

    var lockAcquired = false;

    return this.init()
        .flatMap(() => {
            return acquireLock(this)
        })
        .flatMap(() => {
            lockAcquired = true;
            return appendFile(this, lines)
        })
        .flatMap(() => releaseLock(this))
        .catch(err => {
            if (lockAcquired) {
                return releaseLock(this);
            }
            else {
                return makeGenericObservable();
            }
        })
        .publish()
        .share()  // this too?

};

但是问题是当我调用publish() 时,什么都没有发生,并且 add 方法似乎永远不会被完全调用(我假设序列中的第一个 observable 根本不会触发,因为没有调用有效的 subscribe)。但我虽然publish() 会自动调用可观察链?

如何让 add 方法返回的 observable,hot

【问题讨论】:

    标签: node.js rxjs rxjs5


    【解决方案1】:

    你误会了。 Hot observable确实需要像冷订阅一样订阅。不同之处在于 hot 使用一些外部生产者(如圆顶元素)并开始在订阅时收听它。另一方面,冷可观察在订阅时在内部创建生产者。

    这导致您可能会错过一些带有 hot observable 的事件,因为外部生产者对订阅一无所知并且独立发出。使用冷可观察您不会错过任何东西,因为生产者是在订阅时创建的。

    长话短说,您可以在热或冷可观察对象之上构建任何可观察对象链,但除非您订阅它,否则不会发生任何事情。

    PS。无需将publishshare 一起使用,因为后者是alias for .publish().refCount()

    【讨论】:

    • "不同之处在于 hot 使用一些外部生产者(如圆顶元素)并开始在订阅时收听它。另一方面,cold observable 在订阅时在内部创建生产者。" - 这是不正确的。 - observable 的来源与 observable 的热或冷无关 - 唯一重要的是流是否在订阅之前已经运行,这通常由一群操作员完成,例如share, publish, publishReplay, ...,见:github.com/Reactive-Extensions/RxJS/blob/master/doc/…
    • 这只是 很热 的另一个词 - 但我想指出,hot-/cold-state 取决于您在第一段中描述的 source/producer - 使用外部生产者的 observable 也可以是冷的,而产生值本身的 observable 也可以是热。
    • @olsn,你能给我举一个冷外部生产者的例子吗?
    • 另外:“Hot observable 确实需要像冷的订阅一样” - 这也不是 100% 准确 - 如果您使用 .publish().connect(),流将自行执行不需要额外的订阅者 - 这可能不是一个非常常见的情况,但绝对有可能在不使用 .subscribe 的情况下执行流(也许值得一提,connect 实际上是内部的subscribe
    • @olsn,您提供的文档链接显示“”...这与诸如鼠标移动事件或 ... 之类的热可观察对象不同。因此它处理鼠标事件可观察为热,而在您的 jsBin 中,您将鼠标事件视为冷。为什么会这样?
    【解决方案2】:

    如果您不想使用subscribe,您可以在您的方法中手动.connect()subscribe 它:

    const q = new Queue();
    q.add('foo bar baz');
    
    Queue.prototype.add = Queue.prototype.enqueue = function (lines) {
        lines = _.flattenDeep([lines]);
        var lockAcquired = false;
    
        let add$ = this.init()
            .flatMap(() => {
                return acquireLock(this)
            })
            .flatMap(() => {
                lockAcquired = true;
                return appendFile(this, lines)
            })
            .flatMap(() => releaseLock(this))
            .catch(err => {
                if (lockAcquired) {
                    return releaseLock(this);
                }
                else {
                    return makeGenericObservable();
                }
            })
            .publish();
    
         add$.connect();
         return add$;  // optional, depends if you even need the stream outside of the add-method
    };
    

    或者作为使用内部subscribe的替代方案:

    const q = new Queue();
    q.add('foo bar baz');  // <<< don't call subscribe
    
    Queue.prototype.add = Queue.prototype.enqueue = function (lines) {
        lines = _.flattenDeep([lines]);
        var lockAcquired = false;
        let add$ = this.init()
            ...
            .share();
    
         add$.subscribe();
         return add$;
    };
    

    【讨论】:

    • 为什么发布与连接一起,分享与订阅?
    • publish 返回一个 ConnectableObservable,它需要 connected 才能工作,您仍然可以订阅,但每个订阅共享相同的流 - share 只是 .publish().refCount() 的别名 - 而 refCount 只是在有 at 时自动连接流至少 1 个订阅者 - 因此,如果您想订阅并自动处理连接或自己管理 connect,则区别在于您的具体情况只是您的个人偏好。
    猜你喜欢
    • 2021-11-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多