【问题标题】:RxJS / wait for an event to go through the entire pipeline before emitting the nextRxJS / 等待一个事件通过整个管道,然后再发出下一个
【发布时间】:2022-01-16 21:37:42
【问题描述】:

我希望构建一个 rxjs 管道,仅当当前事件已通过整个管道(FIFO 结构)时才会发出下一个事件。

const { from } = rxjs;
const { map, filter } = rxjs.operators;

// the event `2` should only be emitted
// when the event `1` has reached the end of the pipeline.
// and so on and so fort
from([1, 2, 3, 4, 5, 6, 7]).pipe(
  map((n) => n ** 2),
  filter((n) => n % 2),
).subscribe(console.log);


/**
-1-------|
-2-------|-1
...
**/
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.4.0/rxjs.umd.js" integrity="sha512-DRDXreq+zyiPhlKTfJ5pgzWRn+6SgJ7cPoRxMNksyHmUEOjKiKIoqvssNYNwknpvVbQsVN5hlhh3sp0rxbU7Bg==" crossorigin="anonymous" referrerpolicy="no-referrer"></script>

【问题讨论】:

  • 在我看来,在您的示例中,顺便说一句,它是完全同步的,这正是发生的情况:只有在 1 在订阅提供的函数中通知后,2 才进入管道。如果事情是异步的,那么exhaustMap 运算符可能会有所帮助。我错过了什么吗?

标签: javascript rxjs queue rxjs6


【解决方案1】:

你正在寻找的东西重新组合了我某种 "backpressure" in RxJS,它不再被维护,并且 RxJS 没有提供任何可以完全按照你描述的操作符(或 Observable 创建方法)。

但是,您可以使用 zip() 运算符执行相同的操作,因为它只会在所有源 Observable 发出相同数量的 next 发射时才会发射。

import { Subject, zip, from, map, startWith } from 'rxjs';

const done$ = new Subject<void>();

zip(
  from([1, 2, 3, 4, 5, 6, 7]),
  done$.pipe(
    startWith(void 0), // trigger the first emission
  ),
)
  .pipe(
    map(([value]) => value),
  )
  .subscribe(value => {
    console.log(value);
    setTimeout(() => done$.next(void 0), 500);
  });

现场演示:https://stackblitz.com/edit/rxjs-oqzeqx?devtoolsheight=60

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-01-13
    • 1970-01-01
    • 1970-01-01
    • 2020-04-14
    相关资源
    最近更新 更多