【发布时间】:2018-01-10 20:02:51
【问题描述】:
我有两个 observables 并使用 concatDelayError 进行顺序处理。
我的问题是 onNext 和 onCompleted 在处理之前被提前调用。 我如何知道所有处理都已使用 concatDelayError 完成?
伪代码:
public Observable<Integer> concat(){
int x = 10;
int y = 20;
Observable obx = Observable.create(emitter -> {
try {
int x = doSomeThing();
emitter.onNext(x);
emitter.onCompleted();
} catch (SQLiteException e) {
emitter.onError(e);
}
}, Emitter.BackpressureMode.BUFFER);
Observable oby = Observable.create(emitter -> {
try {
int y = doSomeThing();
emitter.onNext(y);
emitter.onCompleted();
} catch (SQLiteException e) {
emitter.onError(e);
}
}, Emitter.BackpressureMode.BUFFER);
Observable concated = Observable.concatDelayError(ob1,ob2)
.compose(applySchedulers())
.replay().autoConnect();
}
//somewhere else
concat().subscribe(mReplaySubject);
//somewhere else
mReplaySubject.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
launchActivity(SplashActivity.this, HomeActivity.class);
SplashActivity.this.finish();
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(Integer value) {
}
});
我正在使用带有 autoConnect() 的 reply() 并订阅 ReplySubject,因为我需要共享单个订阅。
【问题讨论】:
-
您确定您的
Observer.onCompleted()在一次订阅中被多次调用吗? -
请通过将 .doOnEach() 应用于源 observables 和连接的 observables 来添加一些日志记录。否则确实不清楚。
-
@akarnokd 我正在使用 ReplaySubject。我已经改变了问题。你能回答这个问题吗?
-
@akarnokd 我登录完成,它没有多次调用,但它甚至在处理完成之前就提前调用了
-
您是否尝试过不使用
ReplaySubject直接订阅concated?replay().autoConnect()那时已经为你做缓存了。
标签: java android rx-java rx-android concatmap