【问题标题】:How does the RxJs 5 share() operator work?RxJs 5 share() 操作符是如何工作的?
【发布时间】:2016-05-10 13:33:10
【问题描述】:

对我来说,RxJs 5 share() 操作员的工作原理并不是 100% 清楚,请参阅此处的 latest docs。 Jsbin 为问题here.

如果我用一系列 0 到 2 创建一个 observable,每个值间隔一秒:

var source = Rx.Observable.interval(1000)
.take(5)
.do(function (x) {
    console.log('some side effect');
});

如果我为这个 observable 创建两个订阅者:

source.subscribe((n) => console.log("subscriptor 1 = " + n));
source.subscribe((n) => console.log("subscriptor 2 = " + n));

我在控制台中得到了这个:

"some side effect ..."
"subscriptor 1 = 0"
"some side effect ..."
"subscriptor 2 = 0"
"some side effect ..."
"subscriptor 1 = 1"
"some side effect ..."
"subscriptor 2 = 1"
"some side effect ..."
"subscriptor 1 = 2"
"some side effect ..."
"subscriptor 2 = 2"

我以为每个订阅都会订阅同一个 Observable,但似乎并非如此!就像订阅的行为创建了一个完全独立的 Observable!

但是如果share()操作符被添加到源observable中:

var source = Rx.Observable.interval(1000)
.take(3)
.do(function (x) {
    console.log('some side effect ...');
})
.share();

然后我们得到这个:

"some side effect ..."
"subscriptor 1 = 0"
"subscriptor 2 = 0"
"some side effect ..."
"subscriptor 1 = 1"
"subscriptor 2 = 1"
"some side effect ..."
"subscriptor 1 = 2"
"subscriptor 2 = 2"

如果没有share(),这就是我所期望的。

这里发生了什么,share()operator 是如何工作的?每个订阅是否都会创建一个新的 Observable 链?

【问题讨论】:

    标签: javascript rxjs rxjs5


    【解决方案1】:

    如果满足这两个条件,share 会使 observable “热”:

    1. 订阅人数>0
    2. 并且 observable 尚未完成

    场景 1:订阅者数量 > 0 并且 observable 在新订阅之前未完成

    var shared  = rx.Observable.interval(5000).take(2).share();
    var startTime = Date.now();
    var log = (x) => (value) => { 
        console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`);
    };
    
    var observer1 = shared.subscribe(log('observer1')),
        observer2;
    
    setTimeout(()=>{
        observer2 = shared.subscribe(log('observer2'));
    }, 3000);
    
    // emission for both observer 1 and observer 2, with the samve value at startTime + 5 seconds
    // another emission for both observers at: startTime + 10 seconds
    

    场景 2:在新订阅之前订阅者数量为零。变得“冷”

     var shared  = rx.Observable.interval(5000).take(2).share();
        var startTime = Date.now();
        var log = (x) => (value) => { 
        console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`);
    };
    
    var observer1 = shared.subscribe(log('observer1')),
        observer2;
    
    setTimeout(()=>{
        observer1.unsubscribe(); 
    }, 1000);
    
    setTimeout(()=>{
        observer2 = shared.subscribe(log('observer2')); // number of subscribers is 0 at this time
    }, 3000);
    // observer2's onNext is called at startTime + 8 seconds
    // observer2's onNext is called at startTime + 13 seconds
    

    场景 3:当 observable 在新订阅之前完成时。变得“冷”

     var shared  = rx.Observable.interval(5000).take(2).share();
        var startTime = Date.now();
        var log = (x) => (value) => { 
            console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`);
        };
    
    var observer1 = shared.subscribe(log('observer1')),
        observer2;
    
    setTimeout(()=>{
        observer2 = shared.subscribe(log('observer2'));
    }, 12000);
    
    // 2 emission for observable 1, at startTime + 5 secs, and at startTime + 10secs
    // 2 emissions for observable 2,at startTime + 12 + 5 secs, and at startTime + 12 + 10secs
    

    【讨论】:

    • 将场景 1 和场景 2 设置为 6000 次超时对于“共享”运算符来说会更直观。
    【解决方案2】:

    请注意您使用的是 RxJS v5,而您的文档链接似乎是 RxJS v4。我不记得具体细节,但我认为 share 运营商经历了一些变化,特别是在完成和重新订阅方面,但不要相信我的话。

    回到您的问题,正如您在研究中所表明的,您的期望与图书馆设计不符。 Observables 懒惰地实例化它们的数据流,具体地在订阅者订阅时启动数据流。当第二个订阅者订阅相同的 observable 时,会启动另一个新的数据流,就像它是第一个订阅者一样(所以是的,每个订阅都会创建一个新的 observable 链,如您所说)。这就是在 RxJS 术语中创造的冷可观察对象,这是 RxJS 可观察对象的默认行为。如果你想要一个 observable,它在数据到达的那一刻将其数据发送给它拥有的订阅者,这就是一个 hot observable,获得 hot observable 的一种方法是使用share 运算符。

    您可以在这里找到图示的订阅和数据流:Hot and Cold observables : are there 'hot' and 'cold' operators?(这对 RxJS v4 有效,但大部分对 v5 有效)。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-09-08
      • 2017-06-26
      • 1970-01-01
      • 1970-01-01
      • 2016-11-06
      • 2021-12-01
      • 2021-04-21
      • 1970-01-01
      相关资源
      最近更新 更多