【问题标题】:rx.js catchup subscription from two sourcesrxjs 从两个来源赶上订阅
【发布时间】:2015-11-20 20:49:10
【问题描述】:

我需要将追赶和订阅新提要结合起来。所以首先我在数据库中查询我错过的所有新记录,然后切换到一个 pub sub 来获取所有新的记录。

第一部分很容易进行查询,可能以 500 个为一组,这将为您提供一个数组,您可以 rx.observeFrom 那。

第二部分很简单,你只需在 pubsub 上放一个 rx.observe。

但我需要做的是按顺序播放,所以我需要先播放所有旧唱片,然后再开始播放新唱片。

我想我可以开始订阅 pubsub,将它们放在一个数组中,然后开始处理旧的,当我完成后,要么删除 dup(或者因为我做了 dup 检查)允许少数 dup,但是播放累积的记录,直到它们消失,然后一进一出。

我的问题是最好的方法是什么?我应该创建一个订阅以开始在数组中建立新记录,然后开始处理旧记录,然后在 oldrecord 进程的“then”中订阅另一个数组吗?

好的,这就是我目前所拥有的。我需要建立测试并完成一些伪代码来确定它是否有效,更不用说一个好的实现了。在我埋葬自己之前,请随意阻止我。

var catchUpSubscription = function catchUpSubscription(startFrom) {
    EventEmitter.call(this);
    var subscription = this.getCurrentEventsSubscription();
    // calling map to start subscription and catch in an array.  
    // not sure if this is right
    var events = rx.Observable.fromEvent(subscription, 'event').map(x=> x);
    // getPastEvents gets batches of 500 iterates over and emits each 
   // till no more are returned, then resolves a promise
    this.getPastEvents({count:500, start:startFrom})
    .then(function(){
        rx.Observable.fromArray(events).forEach(x=> emit('event', x));
    });
};

我不知道这是最好的方法。有什么想法吗?

谢谢

【问题讨论】:

    标签: arrays node.js functional-programming rxjs


    【解决方案1】:

    我会避免不必要地混合不同的异步策略。您可以使用concat 将两个序列连接在一起:

    var catchUpSubscription = function catchUpSubscription(startFrom) {
        var subscription = this.getCurrentEventsSubscription();
    
        return Rx.Observable.fromPromise(this.getPastEvents({count:500, start:startFrom}))
         .flatMap(x => x)
         .concat(Rx.Observable.fromEvent(subscription, 'event'));
    
    };
    
    ///Sometime later
    catchUpSubscription(startTime).subscribe(x => /*handle event*/)
    

    【讨论】:

    • 啊,很好,这就是我想要的。几个问题。 getpastevents,循环直到没有更多的事件。我需要查看 db 查询的结果,看看是否有任何返回。如果我返回承诺,我想我不确定如何管理它。其次,我是在创建订阅时还是在订阅之后才开始收集新事件?我不能冒险错过追赶和现场阅读之间的事件。
    • 现在我想起来了,第一部分不好,因为它会不断收集新事件。废话。
    猜你喜欢
    • 1970-01-01
    • 2018-09-05
    • 1970-01-01
    • 2018-09-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多