【问题标题】:In RxJS, how to prevent operators from emitting values if the source observable hasn't emitted any?在 RxJS 中,如果源 observable 没有发出任何值,如何防止操作符发出值?
【发布时间】:2019-10-09 21:47:36
【问题描述】:

我有三个 observable foo$bar$baz$,我将它们合并在一起形成另一个 observable。这按预期工作:

  • 流以>>> 开头
  • 每个值一个一个地发出
  • 流以<<< 结尾

const foo$ = of('foo');
const bar$ = of('bar');
const baz$ = of('baz');

merge(foo$, bar$, baz$).pipe(startWith('>>>'), endWith('<<<')).subscribe(str => {
  console.log(str);
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.2/rxjs.umd.min.js"></script>
<script>const {merge, of} = rxjs; const {startWith, endWith} = rxjs.operators;</script>

现在,如果上面的三个 observable 都没有发出值,我不想输出 &gt;&gt;&gt;&lt;&lt;&lt;。所以startWithendWith 只有在merge(foo$, bar$, baz$) 实际发出一个值时才能“运行”。

为了模拟我使用过滤功能拒绝foo$bar$baz$ 发出的所有值。

const foo$ = of('foo').pipe(filter(() => false));
const bar$ = of('bar').pipe(filter(() => false));
const baz$ = of('baz').pipe(filter(() => false));

merge(foo$, bar$, baz$).pipe(startWith('>>>'), endWith('<<<')).subscribe(str => {
  console.log(str);
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.2/rxjs.umd.min.js"></script>
<script>const {merge, of} = rxjs; const {startWith, endWith, filter} = rxjs.operators</script>

但是,正如您在输出中看到的那样,startWithendWith 都发出了它们的值,即使 merge() 没有产生任何值。

问题:如果 observable 没有发出单个值,如何防止 startWithendWith 执行?

【问题讨论】:

    标签: rxjs rxjs6


    【解决方案1】:

    来自startWith的文档:

    返回一个 Observable,它在开始发射源 Observable 发射的项目之前发射您指定为参数的项目。

    所以startWithendWith 将始终运行。

    我不知道您的预期结果应该是什么,但如果您只想为每个发出的值连接字符串,您可以使用 mapswitchMap 运算符。

    编辑: 使用map 连接每个值的示例:

    const foo$ = of('foo').pipe(filter(() => false));
    const bar$ = of('bar').pipe(filter(() => false));
    const baz$ = of('baz').pipe(filter(() => false));
    
    merge(foo$, bar$, baz$).pipe(map(v => `>>>${v}<<<`)).subscribe(str => {
      console.log(str);
    });
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.2/rxjs.umd.min.js"></script>
    <script>const {merge, of} = rxjs; const {startWith, endWith, filter, map} = rxjs.operators</script>

    【讨论】:

    • 我的预期输出是,如果merge() 没有发出值,那么我不应该在控制台中看到&gt;&gt;&gt;&lt;&lt;&lt;。例如,您能否分叉一个我的 sn-p 并告诉我如何使用 switchMap?谢谢你的回答。
    • 我有一个流可能会也可能不会产生价值。当流完成时,我需要连接发出的字符串,然后才应用我的前缀/后缀。我可以将我的词缀应用到管道之外,但我想知道是否可以将这种行为添加到其中。希望这能澄清一点。
    • @customcommander 我已经添加了一个例子,希望这就是你想要的。
    • 谢谢你,但这不是将词缀应用于每个发出的值吗?我只需要发出一次&gt;&gt;&gt;&lt;&lt;&lt;。前者在流的开头,后者在流的末尾。
    • 我可以看到混乱,我已经编辑了我的问题。对不起。
    【解决方案2】:

    实现这一目标的一种方法是使用 materialize-dematerialize operators 对:

    source$.pipe(
      // turn all events on stream into Notifications
      materialize(),
    
      // wrap elements only if they are present
      switchMap((event, index) => {
        // if its first event and its a value
        if (index === 0 && event.kind === 'N') {
          const startingNotification = new Notification('N', '>>>', undefined);
          return of(startingNotification, event);
        }
    
        // if its a completion event and it not a first event
        if (index > 0 && event.kind === 'C') {
          const endingNotification = new Notification('N', '<<<', undefined);
          return of(endingNotification, event);
        }
    
        return of(event);
      }),
    
      // turn Notifications back to events on stream
      dematerialize()
    )
    

    在操场上玩这个代码:
    https://thinkrx.io/gist/5a7a7f1338737e452ff6a1937b5fe05a
    为方便起见,我还在那里添加了一个空的错误源

    希望对你有帮助

    【讨论】:

    • 我认为,另一种方法是将throwIfEmptystartWithendWith 一起使用,然后是catchError,这将抑制“空错误”并传递原始错误。不过需要测试这种方法。总帐
    【解决方案3】:

    第一个条件很简单。你可以在第一个发射前加上concatMap

    mergeMap((v, index) => index === 0 ? of('>>>', v) : of(v))
    

    第二个条件更棘手。你想要基本上与defaultIfEmpty相反。我想不出任何简单的解决方案,所以无论如何我可能会使用endWith,如果它是第一个也是唯一一个发射(这意味着源刚刚完成而没有发射任何东西),则忽略发射:

    endWith('<<<'),
    filter((v, index) => index !== 0 || v !== '<<<'),
    

    完整示例:

    const foo$ = of('foo');//.pipe(filter(() => false));
    const bar$ = of('bar');//).pipe(filter(() => false));
    const baz$ = of('baz');//.pipe(filter(() => false));
    
    merge(foo$, bar$, baz$).pipe(
      mergeMap((v, index) => index === 0 ? of('>>>', v) : of(v)),
      endWith('<<<'),
      filter((v, index) => index !== 0 || v !== '<<<'),
    ).subscribe(console.log);
    

    现场演示:https://stackblitz.com/edit/rxjs-n2alkc?file=index.ts

    【讨论】:

    • 谢谢你。我觉得@fridoo 的回答更加地道。 +1 虽然
    • 请注意,该解决方案可能会为“热” Observables 产生不同的结果,其中第一个发射可能被吞下。我想您可以将第一个发射添加到startWith,但是无论如何它对于“冷” Observables 的行为都会有所不同......所以我不知道这里最好的解决方案是什么。可能取决于您的特定用例。
    【解决方案4】:

    您可以将合并后的 observable 分配给一个变量,从流中获取第一个元素,然后在发出至少一个值时映射到要执行的 observable。

    const foo$ = of('foo').pipe(filter(() => false))
    const bar$ = of('bar').pipe(filter(() => false))
    const baz$ = of('baz').pipe(filter(() => false))
    
    const merged$ = merge(foo$, bar$, baz$);
    merged$.pipe(
      take(1),
      switchMap(() => merged$.pipe(
        startWith('>>>'),
        endWith('<<<')
      ))
    ).subscribe(console.log);
    

    https://stackblitz.com/edit/rxjs-phspsc

    【讨论】:

    • 感谢您的回答。当merged$ 根本不发射时,这有效。否则,如果合并的 observable 之一发出值,startWithendWith 都不会发出它们的值。
    • @customcommander 不,只要其中一个合并的 observable 发出一个值,您就会切换到另一个使用 startWithendWith 发出的流。请查看我的 stackblitz 并删除 .pipe(filter(() =&gt; false)) 以获取其中一个 observables。
    • 道歉。我确实尝试过,但是没有用,但是当我从您的答案中重写代码时,我一定是在某处打错了字。我复制/粘贴了您的代码,它就起作用了;)我非常喜欢这个答案。在我决定一个之前,我会等待更多的答案(?)。 +1 虽然
    • @customcommander,fridoo,这导致对merged$ 的双重订阅。因此,需要share 源并在switchMap 内部执行concat
    • @kos 我的理解是switchMap 将取消订阅源 observable。所以应该没问题,除非我弄错了。
    【解决方案5】:

    我看到所有答案都是组合运算符完成的,但是仅使用运算符完成这个解决方案有点棘手,为什么不直接创建自己的 observable?我认为这个解决方案是最容易理解的没有隐藏的聪明之处,没有复杂的运算符组合,没有订阅重复......

    解决方案:

    const foo$ = of('foo')//.pipe(filter(() => false));
    const bar$ = of('bar')//.pipe(filter(() => false));
    const baz$ = of('baz')//.pipe(filter(() => false));
    
    function wrapIfOnEmit$(...obs) {
      return new Observable(observer => {
        let hasEmitted;
        const subscription = merge(...obs).subscribe((data) => {
          if (!hasEmitted) {
            observer.next('>>>');
            hasEmitted = true;
          }
          observer.next(data);
        },
        (error) => observer.error(error),
        () => {
          if (hasEmitted) observer.next('<<<');
          observer.complete();
        })
        return () => subscription.unsubscribe();
      })
    }
    
    wrapIfOnEmit$(foo$, bar$, baz$).subscribe(console.log);
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.2/rxjs.umd.min.js"></script>
    <script>const {merge, of, Observable} = rxjs; const {filter} = rxjs.operators</script>

    希望这会有所帮助!

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-12-27
      • 1970-01-01
      • 2016-06-23
      • 2022-09-28
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多