【问题标题】:onCompleted called early before processing RxJava concatMaponCompleted 在处理 RxJava concatMap 之前提前调用
【发布时间】:2018-01-10 20:02:51
【问题描述】:

我有两个 observables 并使用 concatDelayError 进行顺序处理。

我的问题是 onNext 和 onCompleted 在处理之前被提前调用。 我如何知道所有处理都已使用 concatDelayError 完成?

伪代码:

public Observable<Integer> concat(){
     int x = 10;
     int y = 20;


        Observable obx = Observable.create(emitter -> {
                    try {
                        int x = doSomeThing();
                        emitter.onNext(x);
                        emitter.onCompleted();
                    } catch (SQLiteException e) {
                        emitter.onError(e);
                    }
                }, Emitter.BackpressureMode.BUFFER);   
        Observable oby = Observable.create(emitter -> {
                    try {
                        int y = doSomeThing();
                        emitter.onNext(y);
                        emitter.onCompleted();
                    } catch (SQLiteException e) {
                        emitter.onError(e);
                    }
                }, Emitter.BackpressureMode.BUFFER);       
        Observable concated =  Observable.concatDelayError(ob1,ob2)
                 .compose(applySchedulers())
                 .replay().autoConnect();  

  }

    //somewhere else

    concat().subscribe(mReplaySubject);


    //somewhere else

     mReplaySubject.subscribe(new Observer<Integer>() {
                        @Override
                        public void onCompleted() {
                            launchActivity(SplashActivity.this, HomeActivity.class);
                            SplashActivity.this.finish();
                        }    
                        @Override
                        public void onError(Throwable e) {
                            e.printStackTrace();
                        }    
                        @Override
                        public void onNext(Integer value) {

                        }
                    });

我正在使用带有 autoConnect() 的 reply() 并订阅 ReplySubject,因为我需要共享单个订阅。

【问题讨论】:

  • 您确定您的Observer.onCompleted() 在一次订阅中被多次调用吗?
  • 请通过将 .doOnEach() 应用于源 observables 和连接的 observables 来添加一些日志记录。否则确实不清楚。
  • @akarnokd 我正在使用 ReplaySubject。我已经改变了问题。你能回答这个问题吗?
  • @akarnokd 我登录完成,它没有多次调用,但它甚至在处理完成之前就提前调用了
  • 您是否尝试过不使用ReplaySubject 直接订阅concatedreplay().autoConnect() 那时已经为你做缓存了。

标签: java android rx-java rx-android concatmap


【解决方案1】:

您可以在replay().autoConnect() 之前使用doOnSubscribe,但您还需要确保返回的Observable 最多设置一次:

final class OnceObservable {

    final AtomicReference<Observable<Integer>> ref = new AtomicReference<>();

    int x;
    int y;

    Observable<Integer> concat() {
        Observable<Integer> obs = ref.get();
        if (obs != null) {
            return;
        }

        obs = Observable.concatDelayError(
            Observable.fromCallable(() -> doSomething()),
            Observable.fromCallable(() -> doSomethingElse())
        )
        .doOnSubscribe(() -> {
            x = 10;
            y = 20;
        })
        .replay().autoConnect();

        if (!ref.compareAndSet(null, obs)) {
            obs = ref.get();
        }
        return obs;
    }
}

【讨论】:

  • 最后一个问题:.replay().autoConnect();自动缓存订阅?
  • 仍然在提前调用 onCompleted()
  • 什么订阅?另外如何检测早期的onCompleted
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-01-25
  • 1970-01-01
  • 2017-04-06
  • 1970-01-01
  • 2017-07-07
  • 1970-01-01
相关资源
最近更新 更多