【问题标题】:How to prevent AsyncSubject from completing when the last observer unsubscribes当最后一个观察者取消订阅时,如何防止 AsyncSubject 完成
【发布时间】:2018-04-26 05:40:47
【问题描述】:

当最后一个主题观察者取消订阅该主题时,AsyncSubject 变为可观察的。这里是the quote

当它完成时,它就完成了。主题在使用后不能重复使用 已取消订阅、已完成或出错。

这里是演示:

const ofObservable = Rx.Observable.of(1, 2, 3);
const subject = new Rx.AsyncSubject();

ofObservable.subscribe(subject);

subject.subscribe((v) => {
    console.log(v);
});

subject.unsubscribe((v) => {
    console.log(v);
});

// here I'll get the error "object unsubscribed"
subject.subscribe((v) => {
    console.log(v);
});

如何防止题目完成?

有一个share 运算符:

在 RxJS 5 中,操作符 share() 产生了一个热的 refCounted observable 可以在失败时重试,或在成功时重复。因为 主题一旦出现错误、完成或其他情况就不能重复使用 取消订阅后,share() 运营商将回收死去的主题到 启用重新订阅生成的 observable。

这就是我要找的。但是share 创建了一个主题,我需要AsyncSubject

【问题讨论】:

  • This test 建议它应该按照您期望的方式运行,不是吗?会不会有其他事情发生?
  • 嗯,谢谢,检查this plunker,没有别的了
  • 对我来说,它看起来有点像错字。将subject.subscribe 调用的结果分配给一个变量(结果是订阅)并在订阅上调用unsubscribe - 而不是主题。随着这种变化,笨蛋似乎做了我所期望的。
  • @cartant,谢谢,将其作为答案发布。我也想知道当我取消订阅主题时发生了什么?它应该已经从 of observable 退订,因为后者已经完成,那么当我退订主题时会发生什么?另外,这是否意味着我引用的引用仅适用于普通主题?
  • 我真的不知道在这个主题上拨打unsubscribe 发生了什么。这有点有趣,所以我会研究一下,稍后会写一个答案。我猜你的问题中的引用是否适用取决于你对重用的定义。我的理解是,订阅者可以在异步主题完成后订阅它,但异步主题不能订阅另一个可观察的源。

标签: javascript rxjs


【解决方案1】:

问题出在这一行:

subject.unsubscribe((v) => {
    console.log(v);
});

Subjectimplements ISubscription;这意味着它有一个unsubscribe 方法和一个closed 属性。其implementation of unsubscribe如下:

unsubscribe() {
  this.isStopped = true;
  this.closed = true;
  this.observers = null;
}

这有点残酷。从本质上讲,它切断了与该主题的任何订阅者的所有通信,而无需取消订阅它们。类似地,它不会从它可能碰巧订阅的任何 observable 中取消订阅主题本身。 (它还将主题标记为关闭/停止,这就是您出错的原因。)

鉴于它实际上并没有取消订阅,它应该如何使用尚不清楚。 this test的描述:

it('should disallow new subscriber once subject has been disposed', () => {

暗示这可能是 RxJS 4 的某种遗留问题 - 其中取消订阅被称为处置。

无论它存在的原因是什么,我都建议永远不要调用它。举个例子,看看这个sn-p:

const source = Rx.Observable
  .interval(200)
  .take(5)
  .do(value => console.log(`source: ${value}`));

const subject = new Rx.Subject();
source.subscribe(subject);

const subscription = subject
  .switchMap(() => Rx.Observable
    .interval(200)
    .take(5)
    .delay(500))
  .subscribe(value => console.log(`subscription: ${value}`));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

它将一个主题订阅到一个源 observable,然后订阅一个由该主题组成的 observable。

如果在该主题上调用unsubscribe,则会出现一些问题:

  • 主题对源的订阅没有取消订阅,当源尝试调用主题的next方法时出错;和
  • 对由主题组成的可观察对象的订阅并未取消订阅,因此switchMap 中的interval 可观察对象在unsubscribe 调用后继续发射。

试试看:

const source = Rx.Observable
  .interval(200)
  .take(5)
  .do(value => console.log(`source: ${value}`));

const subject = new Rx.Subject();
source.subscribe(subject);

const subscription = subject
  .switchMap(() => Rx.Observable
    .interval(200)
    .take(5)
    .delay(500))
  .subscribe(value => console.log(`subscription: ${value}`));

setTimeout(() => {
  console.log("subject.unsubscribe()");
  subject.unsubscribe();
}, 700);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

这些似乎都不是可取的行为,因此应避免在 Subject 上调用 unsubscribe

相反,您的 sn-p 中的代码应使用 subscribe 调用返回的 Subscription 取消订阅:

const subscription = subject.subscribe((v) => {
  console.log(v);
});
subscription.unsubscribe();

在写完这个答案之后,我从Ben Lesh 中找到了following comment,这符合我的理论,即它与主题的处置有关:

如果您希望主题在完成有用后next 时大声而愤怒地错误,您可以直接在主题实例本身上调用unsubscribe

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-04-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多