【发布时间】:2015-11-20 20:49:10
【问题描述】:
我需要将追赶和订阅新提要结合起来。所以首先我在数据库中查询我错过的所有新记录,然后切换到一个 pub sub 来获取所有新的记录。
第一部分很容易进行查询,可能以 500 个为一组,这将为您提供一个数组,您可以 rx.observeFrom 那。
第二部分很简单,你只需在 pubsub 上放一个 rx.observe。
但我需要做的是按顺序播放,所以我需要先播放所有旧唱片,然后再开始播放新唱片。
我想我可以开始订阅 pubsub,将它们放在一个数组中,然后开始处理旧的,当我完成后,要么删除 dup(或者因为我做了 dup 检查)允许少数 dup,但是播放累积的记录,直到它们消失,然后一进一出。
我的问题是最好的方法是什么?我应该创建一个订阅以开始在数组中建立新记录,然后开始处理旧记录,然后在 oldrecord 进程的“then”中订阅另一个数组吗?
好的,这就是我目前所拥有的。我需要建立测试并完成一些伪代码来确定它是否有效,更不用说一个好的实现了。在我埋葬自己之前,请随意阻止我。
var catchUpSubscription = function catchUpSubscription(startFrom) {
EventEmitter.call(this);
var subscription = this.getCurrentEventsSubscription();
// calling map to start subscription and catch in an array.
// not sure if this is right
var events = rx.Observable.fromEvent(subscription, 'event').map(x=> x);
// getPastEvents gets batches of 500 iterates over and emits each
// till no more are returned, then resolves a promise
this.getPastEvents({count:500, start:startFrom})
.then(function(){
rx.Observable.fromArray(events).forEach(x=> emit('event', x));
});
};
我不知道这是最好的方法。有什么想法吗?
谢谢
【问题讨论】:
标签: arrays node.js functional-programming rxjs