【问题标题】:Observable that would emit only when subscribed to仅在订阅时才会发出的 Observable
【发布时间】:2018-06-26 16:17:27
【问题描述】:

我想创建一个 observable,只有当它有订阅者收听它时才会发出项目。订阅者可以随时添加和/或删除,当没有订阅者连接时,在重新连接新订阅者之前可能会有很长的延迟。 我认为可行的一种可能方式是:

observable = Observable.defer(new Callable<ObservableSource<Long>>() {
        @Override
        public ObservableSource<Long> call() throws Exception {
            final AtomicInteger counter = new AtomicInteger();

            return Observable.create(new ObservableOnSubscribe<Long>() {

                @Override
                public void subscribe(ObservableEmitter<Long> e) throws Exception {
                    emitter = e;
                }
            }).doOnSubscribe(new Consumer<Disposable>() {
                @Override
                public void accept(Disposable disposable) throws Exception {
                    counter.incrementAndGet();
                    startEmitting(emitter);
                }
            }).doOnDispose(new Action() {
                @Override
                public void run() throws Exception {
                    if (counter.decrementAndGet() == 0) {
                        stopEmitting(emitter);
                    }
                }
            });
        }
    });

这个解决方案可能会起作用,但是 Observable 永远不会完成。那是问题吗? 用 stopEmitting 功能完成后,我想下次有人想订阅时我必须创建一个新的观察者? 此外,我需要将发射器传递给 onSubscribe 或 onDispose 函数的方式感觉很奇怪,我想知道它是否是线程安全的?

谁能推荐一个更好的解决方案?

【问题讨论】:

    标签: rx-java rx-java2


    【解决方案1】:

    我对 rx-java2 了解不多,但我对 observable 模式有一些建议。

    你也许可以创建一个名为 ObservableEmitter 的对象。在这个类中,您可以创建一个类似 subscribe(Subscribersubscriber) 的方法和一个方法 emit()。

    订阅者接口随后由您的订阅者实现。我会调用方法接收(消息消息)。

    在代码中它看起来像这样:

    public interface Subscriber {
      void receive(Message msg);
    }
    
    public class ObservableEmitter {
     private List<Subscriber> subscribers = new ArrayList<Subscriber>();
     public subscribe(Subscriber sub) {
      subscribers.add(sub);
     }
     public void emit(Message msg) {
       for(Subscriber sub : subscribers) {
         sub.receive(msg);
       }
     }
    }
    

    这样你只会在有订阅者的情况下发出消息。您的最小示例中缺少一些代码:startEmitting() 以及您如何调用 observable

    【讨论】:

    • 您好 Thomas,我认为您的解决方案可以在阵列上进行一些同步。但是,我渴望 rx 解决方案能够使用更简单的线程以及混合和转换订阅者中的流的能力。 startEmitting 只是说这是我开始/停止发射发射器所需的东西的地方......
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-10-18
    • 2022-07-11
    • 1970-01-01
    相关资源
    最近更新 更多