尝试了所有 Rxjs 5 缓冲区变体,特别是每 n 秒发出空或不发射的 bufferTime,我最终滚动了自己的 bufferTimeLazy:
function bufferTimeLazy(timeout) {
return Rx.Observable.create(subscriber => {
let buffer = [], hdl;
return this.subscribe(res => {
buffer.push(res);
if (hdl) return;
hdl = setTimeout(() => {
subscriber.next(buffer);
buffer = [];
hdl = null;
}, timeout);
}, err => subscriber.error(err), () => subscriber.complete());
});
};
// add operator
Rx.Observable.prototype.bufferTimeLazy = bufferTimeLazy;
// example
const click$ = Rx.Observable.fromEvent(document, 'click');
click$.bufferTimeLazy(5000).subscribe(events => {
console.log(`received ${events.length} events`);
});
示例:
https://jsbin.com/nizidat/6/edit?js,console,output
这个想法是在缓冲区中收集事件并在第一个事件后 n 秒发出缓冲区。一旦发出,清空缓冲区并保持休眠状态,直到下一个事件到达。
如果您不想在 Observable.prototype 中添加操作符,只需调用函数即可:
bufferTimeLazy.bind(source$)(5000)
编辑:
好的,所以 Rxjs 5 还不错:
var clicks = Rx.Observable.fromEvent(document, 'click').share();
var buffered = clicks.bufferWhen(() => clicks.delay(5000));
buffered.subscribe(x => console.log(`got ${x.length} events`));
达到同样的效果。注意 share() 以避免重复点击订阅 - YMMV。