【问题标题】:How to process RxJS stream n items at a time and once an item is done, autofill back to n again?如何一次处理 RxJS 流 n 个项目,一旦项目完成,再次自动填充回 n?
【发布时间】:2016-07-26 23:28:16
【问题描述】:

我有一个事件流,我想调用一个函数来为每个事件返回一个承诺,问题是这个函数非常昂贵,所以我想一次最多处理 n 个事件.

这个卵石图可能是错误的,但这是我想要的:

---x--x--xxxxxxx-------------x------------->  //Events
---p--p--pppp------p-p-p-----p------------->  //In Progress
-------d--d--------d-d-dd------dddd-------->  //Promise Done

---1--21-2-34-----------3----4-3210--------   //QUEUE SIZE CHANGES

这是我目前的代码:

var n = 4;
var inProgressCount = 0;

var events$ = Rx.Observable.fromEvent(produceEvent, 'click')
  .map((ev) => new Date().getTime());

var inProgress$ = events$.controlled();

var done$ = inProgress$      
  .tap(() => inProgressCount++)
  .flatMap((timestamp) => Rx.Observable.fromPromise(expensiveComputation(getRandomInt(1, 5) * 1000, timestamp)));

done$.subscribeOnNext((timestamp) => {
  inProgressCount--;
  inProgress$.request(Math.max(1, n - inProgressCount));
});

inProgress$.request(n);

这段代码有两个问题:

  1. 它使用了inProgressCount var,它用 side 更新 效果函数。
  2. 当我从受控流中请求超过 1 个项目时,仅调用一次 done$ 订阅。这使得inProgressCount var 更新不正确,这最终将队列限制为一次一个。

您可以在这里看到它的工作原理: http://jsbin.com/wivehonifi/1/edit?js,console,output

问题:

  1. 有更好的方法吗?
  2. 如何摆脱inProgressCount 变量?
  3. 为什么在请求多个项目时 done$ 订阅只被调用一次?

更新:
对问题 #3 的回答:switchMap 与 flatMapLatest 相同,这就是为什么我只得到最后一个。将代码更新为 flatMap 而不是 switchMap。

【问题讨论】:

    标签: javascript rxjs backpressure


    【解决方案1】:

    您实际上根本不需要使用背压。有一个名为flatMapWithMaxConcurrent 的运算符会为您执行此操作。它本质上是调用.map().merge(concurrency) 的别名,它一次只允许最大数量的流在运行。

    我在这里更新了你的 jsbin:http://jsbin.com/weheyuceke/1/edit?js,output

    但我在下面注释了重要的一点:

    const concurrency = 4;
    
    var done$ = events$
      //Only allows a maximum number of items to be subscribed to at a time
      .flatMapWithMaxConcurrent(concurrency, 
        ({timestamp}) =>   
          //This overload of `fromPromise` defers the execution of the lambda
          //until subscription                    
          Rx.Observable.fromPromise(() => { 
            //Notify the ui that this task is in progress                                 
            updatePanelAppend(inProgress, timestamp);
            removeFromPanel(pending, timestamp);
            //return the task
            return expensiveComputation(getRandomInt(1, 5) * 1000, timestamp)
         }));
    

    【讨论】:

    • 在 RxJs 5 中,您必须将 .flatMapWithMaxConcurrent 替换为 .mergeMap(mapper, (resultSelectorFunction | null), concurrency) mergeMap
    猜你喜欢
    • 1970-01-01
    • 2011-10-20
    • 1970-01-01
    • 2018-04-04
    • 2018-06-16
    • 1970-01-01
    • 2014-11-04
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多