【问题标题】:Preventing premature completion of an async pipeable operator in RxJS防止在 RxJS 中过早完成异步管道操作符
【发布时间】:2018-07-25 10:27:01
【问题描述】:

我正在使用 RxJS 6 创建pipeable operators,但不清楚当操作是异步时如何complete()观察者。

对于同步操作,逻辑很简单。在下面的示例中,来自源 Observable 的所有值都将传递给 observer.next(),然后调用 observer.complete()

const syncOp = () => (source) =>
  new rxjs.Observable(observer => {
    return source.subscribe({
      next: (x) => observer.next(x),
      error: (e) => observer.error(err),
      complete: () => observer.complete()
    })
  });
  
rxjs.from([1, 2, 3]).pipe(syncOp()).subscribe(x => console.log(x));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js">
</script>

但是,对于 异步 操作,我有点不知所措。在下面的示例中,异步操作由对setTimeout() 的调用表示。显然,observer.complete() 将在任何值被传递给observer.next() 之前被调用。

const asyncOp = () => (source) =>
  new rxjs.Observable(observer => {
    return source.subscribe({
      next: (x) => setTimeout(() => observer.next(x), 100),
      error: (e) => observer.error(err),
      complete: () => observer.complete()
    })
  });
  
rxjs.from([1, 2, 3]).pipe(asyncOp()).subscribe(x => console.log(x));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js">
</script>

所以问题是:什么是惯用的 RxJS 方法来实现对 observer.complete() 的调用仅在所有值异步传递给 observer.next() 之后进行?我应该手动跟踪待处理的呼叫还是有更“被动”的解决方案?

(请注意,上面的示例是对我的实际代码的简化,并且对 setTimeout() 的调用旨在表示“任何异步操作”。我正在寻找一种处理异步的通用方法管道操作符中的操作,而不是关于如何处理 RxJS 中的延迟或超时的建议。)

【问题讨论】:

  • 我不明白问题出在哪里。如果你想在nexts 之后调用complete(),然后将complete 调用放入next 处理程序中,并添加一些你需要的额外逻辑。
  • @martin 是的,这就是我在问题“我应该手动跟踪待处理呼叫吗?”时所指的方法。。如果没有其他惯用的替代方法,那我最终会这样做。
  • 最惯用、最通用和可组合的方式是@Picci 在他的回答中提出的 - 不要从头开始创建新的 observable 并订阅它。相反,只需使用内置运算符,让他们为您完成所有订阅管理。如果由于某种原因这对您不起作用,请提供有关您的特定问题的更多详细信息。

标签: javascript asynchronous rxjs rxjs-pipeable-operators


【解决方案1】:

一种思路可能是重组您的asyncOp 以使用其他运算符,例如mergeMap

这是使用这种方法重现您的示例的代码

const asyncOp = () => source => source.pipe(mergeMap(x => of(x).pipe(delay(100))));
from([1, 2, 3]).pipe(asyncOp1()).subscribe(x => console.log(x));

这是否值得考虑取决于您的asyncOp 做了什么。如果它是异步的,因为它依赖于一些回调,例如在 https 调用或从文件系统读取的情况下,那么我认为这种方法可以工作,因为您可以将基于回调的函数转换为 Observable。

【讨论】:

  • 我想说的更强烈:这种方法应该适用于大多数现实生活中的情况。
【解决方案2】:

仍然希望就更被动/惯用的实现获得意见,但以下是我暂时决定采用的方法。

本质上,我只是将计数器用于飞行中的操作 (pending) 并使其仅在源 observable 完成 (completed) 并且没有挂起的操作 (@ 987654323@).

const asyncOp = () => (source) =>
  new rxjs.Observable(observer => {
    let pending = 0; // the number of in-flight operations
    let completed = false; // whether or not the source observable completed
    
    return source.subscribe({
      next: (x) => {
        pending++;
        
        setTimeout(() => {              
          observer.next(x);
          
          if (!--pending && completed) { // no ops pending and source completed
            observer.complete();
          }
        }, 100);
      },
      error: (e) => observer.error(err),
      complete: () => {
        completed = true;
        
        if (!pending) { // no ops pending
          observer.complete();
        }
      }
    })
  });
  
rxjs.from([1, 2, 3]).pipe(asyncOp()).subscribe(x => console.log(x));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js">
</script>

【讨论】:

  • 这是惯用的。这就是运营商的工作方式。你还需要做更多的工作才能让异步操作符按预期工作。
  • @abetteroliver 使用setTimeout 不仅不是惯用的,而且至少在某种程度上是错误的,因为与调度程序不兼容,并且在unsubscribed 时应该清除超时。
  • @arturgrzesiak 您的 cmets 做得很好,但公平地说,正如问题中所述,对 setTimeout() 的调用只是表示“任何异步操作”的占位符。
  • @RobbyCornelissen 我明白这一点,但一种惯用的方法是将异步操作升级到可观察的,例如通过defer / bindCallback / bindNodeCallback / Observable.create 并使用它来组成更大的流。对于setTimeout,惯用的方法是使用以下之一:timerdelaydelayWhen。上面的 sn -p 最大的问题是你可以启动一个异步操作而没有取消它的选项。并且很容易导致奇怪的行为和/或资源泄漏。
  • @arturgrzesiak 是的,我完全理解并同意。此后,我重构了我的代码以使用与 Picci 提供的答案中提出的模式非常相似的模式。感谢您的反馈!
【解决方案3】:

我创建了这个可运行的 StackBlitz demo 来展示我认为应该做什么。

这里的想法是使用toArray() 将源 observable 中的所有值获取到一个数组中。 toArray()之后的代码是一个单值(数组)。

注意:有很多方法(操作员)可以解决问题,这只是基于我从这个问题中理解的一个例子——这对 RxJS Observables 来说既是好事也是坏事。希望这可以帮助。 :-)

主要的演示代码是:

// --- for each value, do the async service
of(...[1, 2, 3]).pipe(
  // let each value be processed by both async service...
  concatMap(no => myAsyncService$(no)),
  concatMap(no => myAsyncService2$(no)),

  // --- toArray() combines all the values (i.e. they completed)
  toArray(),

  // --- this will only be called once - with all completed values
  // --- testing: try commenting the toArray() to see the values as individual "next" value
  tap(val => {
    // see the combined values
    console.log(val)
  })
).subscribe();

【讨论】:

  • 感谢您的回答,但我的问题是关于自定义管道运算符中的异步操作。也许我误解了您的解决方案,但我真的无法在您的代码中找到它......此外,toArray() 方法有点问题,因为只要源 observable 没有完成,就不会输出任何内容。这在观察具有 3 个值的数组时不是问题,但在侦听 websocket 时会出现问题(例如)。
  • 你说得有道理。我想我误解了你的问题,自定义管道运算符是我没有尝试过的。嗯...我将不得不重新考虑。请忽略我的回答。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-04-21
  • 1970-01-01
  • 1970-01-01
  • 2023-03-08
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多