【问题标题】:RxJava share an Observable's emissions between multiple subscribersRxJava 在多个订阅者之间共享一个 Observable 的排放
【发布时间】:2017-03-27 01:44:58
【问题描述】:

我有以下问题:

我有一个 observable 正在做一些工作,但其他 observable 需要该 observable 的输出才能工作。我曾尝试多次订阅同一个 observable,但在日志中我看到原始 observable 被多次启动。

这就是我的 observable 创建对象:

Observable.create((Observable.OnSubscribe<Api>) subscriber -> {
            if (mApi == null) {
                //do some work
            }
            subscriber.onNext(mApi);
            subscriber.unsubscribe();
        })

这就是我需要对象的可观察对象

loadApi().flatMap(api -> api....()));

我正在使用

.subscribeOn(Schedulers.io()) observable.observeOn(AndroidSchedulers.mainThread())
                .unsubscribeOn(Schedulers.io()

在所有可观察对象上。

【问题讨论】:

    标签: java android rx-java


    【解决方案1】:

    我不确定我是否正确理解了您的问题,但我认为您正在寻找一种方法来在多个订阅者之间共享 observable 的排放。有几种方法可以做到这一点。一方面,您可以像这样使用Connectable Observable

    ConnectableObservable<Integer> obs = Observable.range(1,3).publish();
    obs.subscribe(item -> System.out.println("Sub A: " + item));
    obs.subscribe(item -> System.out.println("Sub B: " + item));
    obs.connect(); //Now the source observable starts emitting items
    

    输出:

    Sub A: 1
    Sub B: 1
    Sub A: 2
    Sub B: 2
    Sub A: 3
    Sub B: 3
    

    或者,您可以使用PublishSubject

    PublishSubject<Integer> subject = PublishSubject.create(); //Create a publish subject
    subject.subscribe(item -> System.out.println("Sub A: " + item)); //Subscribe both subscribers on the publish subject
    subject.subscribe(item -> System.out.println("Sub B: " + item));    
    Observable.range(1,3).subscribe(subject); //Subscribe the subject on the source observable
    

    输出:

    Sub A: 1
    Sub B: 1
    Sub A: 2
    Sub B: 2
    Sub A: 3
    Sub B: 3
    

    这两个示例都是单线程的,但您可以轻松地添加 observeOn 或 subscrbeOn 调用以使它们异步。

    【讨论】:

    • 是的,我想从每次订阅的 observable 中获得第一个结果
    • 当我不知道我的 observable 多久被订阅一次,以及订阅者应该在什么时候得到结果时,我应该怎么做
    • 这在this question中得到了很好的回答
    • 这与我的问题无关,但实际上它很好很简单:)
    【解决方案2】:

    首先,使用 Observable.create 很棘手,而且很容易出错。你需要类似的东西

    Observable.create(subscriber -> {
            if (mApi == null) {
                //do some work
            }
            if (!subscriber.isUnsubscribed()) {
               subscriber.onNext(mApi);
               subscriber.onCompleted();
               // Not subscriber.unsubscribe(); 
            }            
     })
    

    你可以使用

    ConnectableObservable<Integer> obs = Observable.just(1).replay(1).autoConnect();
    

    所有后续订阅者都应该获得单个发出的项目

    obs.subscribe(item -> System.out.println("Sub 1 " + item));
    obs.subscribe(item -> System.out.println("Sub 2 " + item));
    obs.subscribe(item -> System.out.println("Sub 3 " + item));
    obs.subscribe(item -> System.out.println("Sub 4 " + item));
    

    【讨论】:

    • 我试过你的,但是 on subscribe 没有被调用。我的 observable 是 public static final。
    • 你用 .publish().replay().autoconnect() 试过你原来的 observable 了吗?
    • 我可能弄乱了运算符的顺序。它可能应该是 .publish().autoconnect().replay(1)。共享操作符等价于 publish().refcount()。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-09-26
    • 1970-01-01
    • 2015-07-25
    • 2019-09-15
    • 1970-01-01
    相关资源
    最近更新 更多