您可以通过编写一个包装初始 AsyncSubject 的小类来轻松实现这一点
import {AsyncSubject, Subject, Observable, Subscription} from 'rxjs/RX'
class SingleSubscriberObservable<T> {
private newSubscriberSubscribed = new Subject();
constructor(private sourceObservable: Observable<T>) {}
subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription {
this.newSubscriberSubscribed.next();
return this.sourceObservable.takeUntil(this.newSubscriberSubscribed).subscribe(next, error, complete);
}
}
然后你可以在你的例子中尝试一下:
const as = new AsyncSubject();
const single = new SingleSubscriberObservable(as)
let fired = false;
function setFired(label:string){
return ()=>{
if(fired == true) throw new Error("Multiple subscriptions executed");
console.log("FIRED", label);
fired = true;
}
}
function logDone(label: string){
return ()=>{
console.log(`${label} Will stop subscribing to source observable`);
}
}
const subscription1 = single.subscribe(setFired('First'), ()=>{}, logDone('First'));
const subscription2 = single.subscribe(setFired('Second'), ()=>{}, logDone('Second'));
const subscription3 = single.subscribe(setFired('Third'), ()=>{}, logDone('Third'));
setTimeout(()=>{
as.next(undefined);
as.complete();
}, 500)
这里的关键是这部分:
subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription {
this.newSubscriberSusbscribed.next();
return this.sourceObservable.takeUntil(this.newSubscriberSubscribed).subscribe(next, error, complete);
}
每次有人调用 subscribe 时,我们都会发送 newSubscriberSubscribed 主题。
当我们订阅我们使用的底层 Observable 时
takeUntil(this.newSubscriberSubscribed)
这意味着当下一个订阅者调用时:
this.newSubscriberSubscribed.next()
之前返回的 observable 将完成。
因此,这将导致您要问的是,每当有新订阅出现时,先前的订阅就完成了。
应用程序的输出将是:
First Will stop subscribing to source observable
Second Will stop subscribing to source observable
FIRED Third
Third Will stop subscribing to source observable
编辑:
如果您想在第一个订阅者保持订阅并且所有未来订阅立即完成的情况下执行此操作(这样当最早的订阅者仍然订阅时,没有其他人可以订阅)。你可以这样做:
class SingleSubscriberObservable<T> {
private isSubscribed: boolean = false;
constructor(private sourceObservable: Observable<T>) {}
subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription {
if(this.isSubscribed){
return Observable.empty().subscribe(next, error, complete);
}
this.isSubscribed = true;
var unsubscribe = this.sourceObservable.subscribe(next, error, complete);
return new Subscription(()=>{
unsubscribe.unsubscribe();
this.isSubscribed = false;
});
}
}
我们保留一个标志this.isSusbscribed 以跟踪当前是否有人订阅。我们还返回一个自定义订阅,我们可以使用它在取消订阅时将此标志设置回 false。
每当有人尝试订阅时,如果我们改为订阅一个空的Observable,它将立即完成。输出如下所示:
Second Will stop subscribing to source observable
Third Will stop subscribing to source observable
FIRED First
First Will stop subscribing to source observable