【问题标题】:Wait for chained observables to complete等待链式 observables 完成
【发布时间】:2017-05-22 01:53:19
【问题描述】:

在使用 RxJS5 实现我想要的东西时遇到了一些麻烦 - 我有一个简单的 Observables 链,从 Rx.Observable.interval 开始:

const Rx = require('rxjs');

var i = 0;

const obs = Rx.Observable.interval(100)
    .flatMap(function () {
        return Rx.Observable.timer(Math.ceil(500*Math.random()))
            .map(function(val){
                console.log(' => These should all log first => ', val);
                return i++;
            });
    })
    .take(5)
    .merge()  // this doesn't seem to do what I want to do
    .map(function (val) {
        console.log('all done = > ', val);
    });

obs.subscribe();

上面记录了这个:

 => These should all log first =>  0
all done = >  0
 => These should all log first =>  0
all done = >  1
 => These should all log first =>  0
all done = >  2
 => These should all log first =>  0
all done = >  3
 => These should all log first =>  0
all done = >  4

我正在寻找记录:

 => These should all log first =>  0
 => These should all log first =>  0
 => These should all log first =>  0
 => These should all log first =>  0
 => These should all log first =>  0

all done = >  [0,1,2,3,4]

很明显,我们并没有等待所有的 timer observables 完成,因为你会看到“all done!”记录了很多次,穿插着“这些都应该先记录”。

我怎样才能得到我正在寻找的输出?

通常情况下,我们可以为此使用zip,但zip 的API 不适合这个用例,因为我们没有同时将所有计时器可观察对象放在一个地方!

如果我的问题不够清楚,这里是我想做的类比,我们阻止所有回调,直到我们任意完成并收集所有结果:

const async = require('async');
var i = 0;

async.forever(function(cb){

    process.nextTick(function(){
       console.log('These should all log first');
       const err = i++ === 5;
       cb(err, i);
    });

}, function done(err, results){
    // let's pretend results contains all the i values
    console.log('all done');
});

【问题讨论】:

  • 请注意,如果我们用 takeLast() 代替 merge(),我们将在“这些都应该首先记录”之后记录“全部完成”,这是“正确的”,然后是“全部完成”记录了 50 倍!我希望“全部完成”只记录一次 :)
  • take 不会对它们进行批处理,它只是在看到足够多时结束流。单个流上的merge 根本不做任何事情。看看例如rxmarbles.com/#merge 很好地说明了正在发生的事情。
  • 嗯,既然您正在映射元素,您难道不希望“全部完成”记录与元素一样多的次数吗?基于此代码,我实际上希望看到该地图控制台日志触发的次数与您“接受”的次数一样多。我认为您对 merge 的使用可能没有达到您的预期,因为您没有按照您的想法提供合并(它不只是坐在那里等待所有“已获取”对象,然后一次合并它们)。
  • 问题是否不清楚? :) 我不知道该怎么做,而且必须有比我刚刚发布的初步答案更好的方法。请注意,这只是一个简单的示例,我需要为我的用例提供一个好的解决方案,可以提炼到这个示例问题(我认为)... take(50).something() 应该能够将所有返回的 observable 合并为一个不知何故,但什么是东西()?

标签: javascript node.js rxjs5 angular2-observables


【解决方案1】:

请注意,这基本上会产生我期望看到的日志记录顺序,但我认为这不是正确/最佳的方法:

   const {Observable} = require('rxjs');

    const obs = Observable.interval(100)
        .flatMap(function () {
          return Observable.timer(Math.ceil(500*Math.random()))
              .map(function(val){
                  console.log(' => These should all log first => ', val);
              });
        })
        .take(5)
        .takeLast()  // <<<<<<<<<<
        .map(function () {
            console.log('all done');
        })
        .take(1)    // <<<<<<<<<<

   obs.subscribe();

我认为还有更好的方法来实现这一点。

【讨论】:

  • 您可以将take(50)subscribe 之间的所有内容替换为last(function() { console.log('all done'); }),或者只是将该日志移至订阅的完整回调中。
  • concatMap替换flatMap(除非你的随机定时器完成和返回值的顺序无关紧要),用toArray替换takeLast并删除take(1)跨度>
【解决方案2】:

@jonsharpe 给了我这个答案,它基本上是有效的。这个问题真的可以归结为Rx.Observable.interval,我们可以摆脱Rx.Observable.timer 映射。这是我们的基本答案:

const Rx = require('rxjs');

const obs = Rx.Observable.interval(100)
    .take(5)
    .map(function(v){
        console.log(v);
        return v;
    })
    .reduce(function (prev, curr) {
        return prev.concat(curr);
    },[])
    .last(function (results) {
        return results;
    })
    .map(function(v){
        console.log(v);
    });


obs.subscribe();

不过,我会非常感兴趣的一种方式来做到这一点没有减少。

【讨论】:

  • reduce 是将所有值收集到数组中所必需的。虽然我不明白你为什么需要last,但 reduce 无论如何都会产生单一结果。在 subscribe 中调用 all done 在语义上也更好,而不是在 map 中调用空的 subscribe
  • @jonrsharpe 你可能是对的,但这违背了 reduce 的语义,不是吗:)
  • 即使这不是 OP 想要的,就像你说的“减少”和“最后”的语义很有趣,这是一段有趣的代码。我捕捉到它是为了帮助我对 Rx 的不断理解。
  • @AlexanderMills 对不起,原来我在想scanscan -> last 相当于 reduce
  • @AlexanderMills 不,我是说 scan 是您想要中间结果时使用的(例如,如果您不知道源流是否会 结束并想要部分总和,只要它发出每个项目)。你只想要最终结果,所以reduce 是正确的版本。您可以使用scan(...).last(...) reduce(...),后者显然更整洁。
【解决方案3】:

所以要求是:

  • 让顶层区间运行 N 次。
  • 每个时间间隔都应该返回一个 Y 计时器 observables 数组。
  • Next 应该追踪 N 次,每次都使用一个 Y 可观察的数组。

这样做。这有点天真,但它确实有效。

    let timerArrLength = [ 4, 2, 3 ];
    let svc = Rx.Observable.interval(1000)
        .take(timerArrLength.length)
        .map(function ( index ) {
            let arr = [];
            for ( let i = 0; i < timerArrLength [ index ]; i++ ) {
                arr.push ( Rx.Observable.timer ( 1000 ) );
            }
            return arr;
        });


        svc.subscribe(
            function onNext(v){
                console.log('=> v =>',v);
            },
            function onError(e){
                console.error(e);
            },
            function onComplete(){
                console.log('complete');
            }
        );

【讨论】:

  • 让我测试一下:)
  • 注意我去掉了第一个let var,你不需要,用数组的长度就好了。
  • 嘿蒂姆,我登出的是一堆可观察的,看看你是否可以在一个最终回调中解开所有值。出于蒸馏目的,只需摆脱计时器并使用间隔即可。
  • 看看我更新的来自 jon 的更新答案
  • 正确,根据所述要求,尽管我认为这就是您要寻找的东西(不知道为什么,但这似乎是您所要求的)。我会看看我能做什么,不过是喝咖啡的时间。也许你可以以此为起点。我认为将 .concat() 和 .last() 与我在这里的代码一起使用可能会奏效。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2018-12-01
  • 1970-01-01
  • 1970-01-01
  • 2021-12-22
  • 2020-10-04
  • 2012-10-31
  • 2020-07-04
相关资源
最近更新 更多