【问题标题】:Angular 2+ using RxJS - complete vs unsubscribe regarding take(1) and first()Angular 2+ 使用 RxJS - 关于 take(1) 和 first() 的完整与取消订阅
【发布时间】:2018-04-09 15:11:03
【问题描述】:

阅读了一些文档和问题后,我发现使用 first() 或 take(1) 在完成时实际取消订阅的天气有点不清楚。我想我的困惑是围绕“完成”与“取消订阅”。要说一个可观察的完成,这是否也意味着订阅被取消订阅?我正在考虑将其用于垃圾收集,而我需要知道可观察对象在 first() 或 take(1) 完成后没有保留任何引用。

如果这些功能没有取消订阅,我需要知道完成后最简单的取消订阅方法。或者这甚至是必要的?

【问题讨论】:

标签: angular rxjs angular2-observables


【解决方案1】:

源代码中的一些东西,

(do) first() 和 take(1) 在完成后实际上取消订阅

看起来是这样。

take.ts

protected _next(value: T): void {
  const total = this.total;
  const count = ++this.count;
  if (count <= total) {
    this.destination.next(value);
    if (count === total) {
      this.destination.complete();
      this.unsubscribe();
    }
  }
}

说一个 observable 完成了,这是否也意味着订阅被取消了?

Subscription.ts 中遇到了这个(文档中没有看到)

/**
 * Adds a tear down to be called during the unsubscribe() of this
 * Subscription.
 *
   ...
 */
add(teardown: TeardownLogic): Subscription {

所以我认为可以使用 teardown 来验证是否调用了取消订阅。

const source1 = Observable.range(1, 10).take(6)

const subscription1 = source1.subscribe(x => console.log('subscription1'))
  .add(() => console.log('teardown1'))
// Emits 6x then 'teardown1'

const subscription2 = source1.take(4).subscribe(x => console.log('subscription2'))
  .add(() => console.log('teardown2'))
// Emits 4x then 'teardown2'

但请注意,take() 仅取消其自身下游的订阅,而不是 所有 可观察对象的订阅者

const source2 = Observable.range(1, 10)

const subscription3 = source2.subscribe(x => console.log('subscription3'))
  .add(() => console.log('teardown3'))
// Emits 10x then 'teardown3'

const subscription4 = source2.take(5).subscribe(x => console.log('subscription4'))
  .add(() => console.log('teardown4'))
// Emits 5x then 'teardown4'

我需要知道在 first() 或 take(1) 完成后 observable 没有保留任何引用。

这有点棘手,这里是Observable.subscribe() 方法。似乎观察者没有保留对它的三个参数中的任何一个的引用,而是订阅保留了对 observable 的引用。

subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void),
          error?: (error: any) => void,
          complete?: () => void): Subscription {

  const { operator } = this;
  const sink = toSubscriber(observerOrNext, error, complete);

  if (operator) {
    operator.call(sink, this.source);
  } else {
    sink.add(this.source ? this._subscribe(sink) : this._trySubscribe(sink));
  }

  if (sink.syncErrorThrowable) {
    sink.syncErrorThrowable = false;
    if (sink.syncErrorThrown) {
      throw sink.syncErrorValue;
    }
  }
  return sink;
}

这可以在下面的测试代码中看到。当订阅处于活动状态(关闭:false)时,订阅者拥有 _subscriptions 中引用的 observable

const source3 = Observable.interval(1000)
const subscription5 = source3.subscribe(x => {})
console.log(source3)
console.log(subscription5)

控制台输出:

IntervalObservable 
  period: 1000
  scheduler: AsyncScheduler {...}
  _isScalar: false
  __proto__: Observable

Subscriber 
  closed: false
  destination: SafeSubscriber {...}
  isStopped: false
  syncErrorThrowable: false
  syncErrorThrown: false
  syncErrorValue: null
  _parent: null
  _parents: null
  _subscriptions: [AsyncAction]
  __proto__: Subscription

但是当我们使用take(1) 关闭订阅时,

const source3 = Observable.interval(1000).take(1)
const subscription5 = source3.subscribe(x => {})
console.log(source3)
console.log(subscription5)

Subscriber _subscriptions 设置为 null,释放引用。

Subscriber 
  closed: true
  destination: SafeSubscriber {...}
  isStopped: true
  syncErrorThrowable: false
  syncErrorThrown: false
  syncErrorValue: null
  _parent: null
  _parents: null
  _subscriptions: null
  __proto__: Subscription

我不确定这是否可以被视为所有可观察/运营商/订阅者链的明确证明,但至少表明了一种验证您的特定用例的方法。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-02-14
    • 1970-01-01
    • 2023-03-24
    • 2021-11-18
    • 1970-01-01
    • 1970-01-01
    • 2018-01-19
    • 1970-01-01
    相关资源
    最近更新 更多