【问题标题】:RxJS: MergeMap with Preserving Input orderRxJS:保留输入顺序的 MergeMap
【发布时间】:2019-03-07 02:33:35
【问题描述】:

要求:

urls = [url1, url2, url3]

并行触发所有 3 个 url 并按照 url 列表的顺序绘制 Dom

 ex: Finished order of urls = [url3, url1, url2]
     when url1 finishes Immediately render the DOM, without waiting for url2
     If url2, url3 finishes before url1, then store url2, url3 and paint the DOM after url1 arrives
     Paint the DOM with order [url1, url2, url3]

我使用 Promise 的工作:

// Fired all 3 urls at the same time
p1 = fetch(url1)
p2 = fetch(url2)
p3 = fetch(url3)

p1.then(updateDom)
  .then(() => p2)
  .then(updateDom)
  .then(() => p3)
  .then(updateDom)

我想在 Observables 中做同样的事情。

from(urls)
  .pipe(
      mergeMap(x => fetch(x))
  )

为了并行触发它们,我使用了合并映射,但是如何对结果的顺序进行排序?

【问题讨论】:

标签: rxjs system.reactive


【解决方案1】:

像这样的异步任务保持顺序的最佳方法是使用concatMap

问题是,如果我们单独应用它,我们就会失去并行化。如果我们要这样做:

from(urls)
  .pipe(
      concatMap(x => fetch(x))
  );

在第一个请求完成之前不会触发第二个请求。

我们可以通过将地图分离成自己的运算符来解决这个问题:

from(urls)
  .pipe(
      map(x => fetch(x)),
      concatMap(x => x)
  );

所有请求会同时触发,但结果会按请求顺序发出。

Adrian's 示例,适用于下面的这种方法:

const { from } = rxjs;
const { concatMap, map } = rxjs.operators;

function delayPromise(value, delay) {
  return new Promise(resolve => setTimeout(() => resolve(value), delay));
}

var delay = 3;

from([1, 2, 3]).pipe(
  map(x => delayPromise(x, delay-- * 1000)),
  concatMap(x => x)
).subscribe(result => { console.log(result); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.4.0/rxjs.umd.min.js"></script>

【讨论】:

  • 感谢您的解决方案。但是很抱歉我只能接受一个答案。
  • @pawankumar 很好,我只是发布它以防其他人发现这个问题
  • 对不起,我不是那个意思,我发现你的答案比接受的要好得多。想以一种有趣的方式说出来。
  • 是的,这应该是公认的答案。干得好。
  • @nilspetersohn 是的,我对 forkJoin 方法的争论感到沮丧,这促使我创建了这个答案。
【解决方案2】:

我找不到任何保留顺序的东西,所以我想出了一些有点复杂的东西。

const { concat, of, BehaviorSubject, Subject } = rxjs;
const { delay, filter } = rxjs.operators;

const parallelExecute = (...obs$) => {
  const subjects = obs$.map(o$ => {
    const subject$ = new BehaviorSubject();
    const sub = o$.subscribe(o => { subject$.next(o); });
    return { sub: sub, obs$: subject$.pipe(filter(val => val)) };
  });
  const subject$ = new Subject();
  sub(0);
  function sub(index) {
    const current = subjects[index];
    current.obs$.subscribe(c => {
      subject$.next(c);
      current.obs$.complete();
      current.sub.unsubscribe();
      if (index < subjects.length -1) {
        sub(index + 1);
      } else {
        subject$.complete();
      }
    });
  }
  return subject$;
}


parallelExecute(
  of(1).pipe(delay(3000)),
  of(2).pipe(delay(2000)),
  of(3).pipe(delay(1000))
).subscribe(result => { console.log(result); });
&lt;script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.4.0/rxjs.umd.min.js"&gt;&lt;/script&gt;

【讨论】:

    【解决方案3】:

    你可以用 fetch 和 paint 然后 forkJoin/Promise.all 形成一个序列

    p1 = fetch(url1)
    p2 = fetch(url2)
    p3 = fetch(url3)
    
    forkJoin(
    from(p1).pipe(tap(_=>paint dom...))
    from(p1).pipe(tap(_=>paint dom...))
    from(p1).pipe(tap(_=>paint dom...))
    ).subscribe()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-09-10
      • 2020-03-28
      • 2013-12-24
      • 2012-03-24
      • 2021-07-18
      • 2019-01-31
      相关资源
      最近更新 更多