【问题标题】:How to subscribe exactly once to an element from AsyncSubject (consumer pattern)如何从 AsyncSubject 中只订阅一次元素(消费者模式)
【发布时间】:2017-03-09 19:19:20
【问题描述】:

rxjs5 中,我有一个AsyncSubject 并想多次订阅它,但只有一个订阅者应该收到next() 事件。所有其他人(如果他们尚未取消订阅)应立即获得complete() 事件,而无需next()

例子:

let fired = false;
let as = new AsyncSubject();

const setFired = () => {
    if (fired == true) throw new Error("Multiple subscriptions executed");
    fired = true;
}

let subscription1 = as.subscribe(setFired);
let subscription2 = as.subscribe(setFired);

// note that subscription1/2 could be unsubscribed from in the future
// and still only a single subscriber should be triggered

setTimeout(() => {
    as.next(undefined);
    as.complete();
}, 500);

【问题讨论】:

  • 主题被设计为多播,这样他们的所有订阅者都会收到事件。我不确定 RxJs 是否会支持你正在尝试做的事情。这确实像是一个 X-Y 问题——您能否让我们更深入地了解您想要完成的工作?

标签: javascript typescript ecmascript-6 producer-consumer rxjs5


【解决方案1】:

您可以通过编写一个包装初始 AsyncSubject 的小类来轻松实现这一点

import {AsyncSubject, Subject, Observable, Subscription} from 'rxjs/RX'

class SingleSubscriberObservable<T> {
    private newSubscriberSubscribed = new Subject();

    constructor(private sourceObservable: Observable<T>) {}

    subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription {
        this.newSubscriberSubscribed.next();
        return this.sourceObservable.takeUntil(this.newSubscriberSubscribed).subscribe(next, error, complete);
    }
}

然后你可以在你的例子中尝试一下:

const as = new AsyncSubject();
const single = new SingleSubscriberObservable(as)

let fired = false;

function setFired(label:string){
    return ()=>{
        if(fired == true) throw new Error("Multiple subscriptions executed");
        console.log("FIRED", label);
        fired = true;
    }
}

function logDone(label: string){
    return ()=>{
       console.log(`${label} Will stop subscribing to source observable`);
    }
}

const subscription1 = single.subscribe(setFired('First'), ()=>{}, logDone('First'));
const subscription2 = single.subscribe(setFired('Second'), ()=>{}, logDone('Second'));
const subscription3 = single.subscribe(setFired('Third'), ()=>{}, logDone('Third'));

setTimeout(()=>{
    as.next(undefined);
    as.complete();
}, 500)

这里的关键是这部分:

subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription {
    this.newSubscriberSusbscribed.next();
    return this.sourceObservable.takeUntil(this.newSubscriberSubscribed).subscribe(next, error, complete);
}

每次有人调用 subscribe 时,我们都会发送 newSubscriberSubscribed 主题。

当我们订阅我们使用的底层 Observable 时

takeUntil(this.newSubscriberSubscribed)

这意味着当下一个订阅者调用时:

this.newSubscriberSubscribed.next()

之前返回的 observable 将完成。

因此,这将导致您要问的是,每当有新订阅出现时,先前的订阅就完成了。

应用程序的输出将是:

First Will stop subscribing to source observable
Second Will stop subscribing to source observable
FIRED Third
Third Will stop subscribing to source observable

编辑:

如果您想在第一个订阅者保持订阅并且所有未来订阅立即完成的情况下执行此操作(这样当最早的订阅者仍然订阅时,没有其他人可以订阅)。你可以这样做:

class SingleSubscriberObservable<T> {
    private isSubscribed: boolean = false;

    constructor(private sourceObservable: Observable<T>) {}

    subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription {
        if(this.isSubscribed){
            return Observable.empty().subscribe(next, error, complete);    
        }
        this.isSubscribed = true;
        var unsubscribe = this.sourceObservable.subscribe(next, error, complete);
        return new Subscription(()=>{
            unsubscribe.unsubscribe();
            this.isSubscribed = false;
        });
    }
}

我们保留一个标志this.isSusbscribed 以跟踪当前是否有人订阅。我们还返回一个自定义订阅,我们可以使用它在取消订阅时将此标志设置回 false。

每当有人尝试订阅时,如果我们改为订阅一个空的Observable,它将立即完成。输出如下所示:

Second Will stop subscribing to source observable
Third Will stop subscribing to source observable
FIRED First
First Will stop subscribing to source observable

【讨论】:

  • this.subscriptions.next() 未知,因为您的示例中没有 this.subscriptions 字段?
  • 如果订阅者在 .next() 事件之前取消订阅,我认为您的示例无法正确处理。请参阅我的其他评论:“不是第一个订阅者应该解雇,而是最早的订阅者仍然订阅”
  • 我编辑过,应该是this.newSubscriberSubscribed,它是上次编辑时留下的,当时我更改了名称以使其更清晰。
  • 从您的问题中不清楚这是您想要的行为。我编辑了答案以包含 SingleSubscriberObservable 的不同版本,这将使第一个订阅者成为唯一的订阅者。
【解决方案2】:

最简单的方法是将您的 AsyncSubject 包装在另一个对象中,该对象仅处理调用 1 个订阅者的逻辑。假设您只想调用第一个订阅者,下面的代码应该是一个很好的起点

let as = new AsyncSubject();

const createSingleSubscriberAsyncSubject = as => {
    // define empty array for subscribers
    let subscribers = [];

    const subscribe = callback => {
        if (typeof callback !== 'function') throw new Error('callback provided is not a function');

        subscribers.push(callback);

        // return a function to unsubscribe
        const unsubscribe = () => { subscribers = subscribers.filter(cb => cb !== callback); };
        return unsubscribe;
    };

    // the main subscriber that will listen to the original AS
    const mainSubscriber = (...args) => {
        // you can change this logic to invoke the last subscriber
        if (subscribers[0]) {
            subscribers[0](...args);
        }
    };

    as.subscribe(mainSubscriber);

    return {
        subscribe,
        // expose original AS methods as needed
        next: as.next.bind(as),
        complete: as.complete.bind(as),
    };
};

// use

const singleSub = createSingleSubscriberAsyncSubject(as);

// add 3 subscribers
const unsub1 = singleSub.subscribe(() => console.log('subscriber 1'));
const unsub2 = singleSub.subscribe(() => console.log('subscriber 2'));
const unsub3 = singleSub.subscribe(() => console.log('subscriber 3'));

// remove first subscriber
unsub1();

setTimeout(() => {
    as.next(undefined);
    as.complete();
    // only 'subscriber 2' is printed
}, 500);

【讨论】:

  • 第一个订阅者不正确,因为订阅者可以取消订阅。它应该是“仍然订阅的最早订阅者”
  • 这就是它现在正在做的事情。它将按顺序将订阅者添加到数组中,如果其中任何一个取消订阅,它将从数组中删除,因此数组中的第一个元素将始终是仍然订阅的最早元素。如果这不是你的意思,请告诉我
  • 一个小评论:您的解决方案不会将 .complete() 发送给其他订阅者,但不是每个人都需要这个,我知道如何添加它,所以我奖励你。
  • 我的答案更短,发送.complete() 性能更高(没有数组搜索)和更惯用的 RX。哦,好吧....
  • @Dyna 告诉我你希望其他订阅者在调用.complete() 时做什么,为了完整起见我会添加它
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-03-21
  • 2018-03-14
  • 2012-12-31
  • 1970-01-01
  • 1970-01-01
  • 2017-07-17
相关资源
最近更新 更多