【问题标题】:RXJS concatMap should also lazy pull?RXJS concatMap 也应该懒拉?
【发布时间】:2017-02-25 21:46:00
【问题描述】:

如果我使用范围并使用 takeWhile 限制输出:

Rx.Observable
    .range( 1, 10 )
    .do( idx => console.log('range: ', idx ))
    .takeWhile( idx => idx <= 5 )
    .subscribe( idx => console.log( 'res   : ', idx ))
    ;

输出是:

range:  1
res   :  1
range:  2
res   :  2
range:  3
res   :  3
range:  4
res   :  4
range:  5
res   :  5
range:  6

按范围生成的值并非全部消耗。 6被拉取,不通过takeWhile,不再取值。

现在,如果我在两者之间有一个 concatMap:

Rx.Observable
    .range( 1, 10 )
    .do( idx => console.log('range: ', idx ))
    .concatMap( idx => {
        var res = new Rx.Subject<number>();
        setTimeout( () => {
            res.next( idx );
            res.complete();
        }, 10 );
        return res;
    })
    .takeWhile( idx => idx <= 5 )
    .subscribe( idx => console.log( 'res:   ', idx ))
    ;

输出是这样的:

range:  1
range:  2
range:  3
range:  4
range:  5
range:  6
range:  7
range:  8
range:  9
range:  10
res:    1
res:    2
res:    3
res:    4
res:    5

我希望,范围生产的价值也会在这里受到限制。 concatMap 保留了顺序,因此只有在前一个 observable 完成时才拉下一个值。但所有范围错误都被拉出。
这是一个错误吗?或者真正的行为是什么。能不能帮忙理解一下。

【问题讨论】:

    标签: rxjs rxjs5


    【解决方案1】:

    range() 产生的值被缓存在concatMap() operator 内部,然后被一一拉取。然后您使用setTimeout() 异步发出值。操作员通常不会尝试使用任何背压,因此源中的所有项目都在准备好时被发射。

    请注意,即使使用Observable.of() 和异步调度程序Scheduler.async,您也可以获得相同的效果。这使得来自Observable.of 的发射发生在一个新的事件中,使其异步。

    const Observable = Rx.Observable;
    const Scheduler = Rx.Scheduler;
    
    Rx.Observable
        .range( 1, 10 )
        .do( idx => console.log('range: ', idx ))
        .concatMap( idx => Observable.of(idx, Scheduler.async))
        .takeWhile( idx => idx <= 5 )
        .subscribe( idx => console.log( 'res:   ', idx ));
    

    观看现场演示:https://jsbin.com/xukolo/3/edit?js,console

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2018-09-16
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多