【发布时间】:2016-07-26 23:28:16
【问题描述】:
我有一个事件流,我想调用一个函数来为每个事件返回一个承诺,问题是这个函数非常昂贵,所以我想一次最多处理 n 个事件.
这个卵石图可能是错误的,但这是我想要的:
---x--x--xxxxxxx-------------x-------------> //Events
---p--p--pppp------p-p-p-----p-------------> //In Progress
-------d--d--------d-d-dd------dddd--------> //Promise Done
---1--21-2-34-----------3----4-3210-------- //QUEUE SIZE CHANGES
这是我目前的代码:
var n = 4;
var inProgressCount = 0;
var events$ = Rx.Observable.fromEvent(produceEvent, 'click')
.map((ev) => new Date().getTime());
var inProgress$ = events$.controlled();
var done$ = inProgress$
.tap(() => inProgressCount++)
.flatMap((timestamp) => Rx.Observable.fromPromise(expensiveComputation(getRandomInt(1, 5) * 1000, timestamp)));
done$.subscribeOnNext((timestamp) => {
inProgressCount--;
inProgress$.request(Math.max(1, n - inProgressCount));
});
inProgress$.request(n);
这段代码有两个问题:
- 它使用了
inProgressCountvar,它用 side 更新 效果函数。 当我从受控流中请求超过 1 个项目时,仅调用一次 done$ 订阅。这使得inProgressCountvar 更新不正确,这最终将队列限制为一次一个。
您可以在这里看到它的工作原理: http://jsbin.com/wivehonifi/1/edit?js,console,output
问题:
- 有更好的方法吗?
- 如何摆脱
inProgressCount变量? 为什么在请求多个项目时 done$ 订阅只被调用一次?
更新:
对问题 #3 的回答:switchMap 与 flatMapLatest 相同,这就是为什么我只得到最后一个。将代码更新为 flatMap 而不是 switchMap。
【问题讨论】:
标签: javascript rxjs backpressure