【发布时间】:2018-06-26 16:17:27
【问题描述】:
我想创建一个 observable,只有当它有订阅者收听它时才会发出项目。订阅者可以随时添加和/或删除,当没有订阅者连接时,在重新连接新订阅者之前可能会有很长的延迟。 我认为可行的一种可能方式是:
observable = Observable.defer(new Callable<ObservableSource<Long>>() {
@Override
public ObservableSource<Long> call() throws Exception {
final AtomicInteger counter = new AtomicInteger();
return Observable.create(new ObservableOnSubscribe<Long>() {
@Override
public void subscribe(ObservableEmitter<Long> e) throws Exception {
emitter = e;
}
}).doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
counter.incrementAndGet();
startEmitting(emitter);
}
}).doOnDispose(new Action() {
@Override
public void run() throws Exception {
if (counter.decrementAndGet() == 0) {
stopEmitting(emitter);
}
}
});
}
});
这个解决方案可能会起作用,但是 Observable 永远不会完成。那是问题吗? 用 stopEmitting 功能完成后,我想下次有人想订阅时我必须创建一个新的观察者? 此外,我需要将发射器传递给 onSubscribe 或 onDispose 函数的方式感觉很奇怪,我想知道它是否是线程安全的?
谁能推荐一个更好的解决方案?
【问题讨论】: