【问题标题】:Why didn‘t child subscriber's onComplete() method invoke when use retryWhen() in RxJava为什么在 RxJava 中使用 retryWhen() 时子订阅者的 onComplete() 方法没有调用
【发布时间】:2016-03-10 06:36:38
【问题描述】:

根据retryWhen() 文档:

返回一个与源发出相同值的 Observable 可观察到,但 onError 除外。一个 onError 通知 从源将导致一个 Throwable 项目发射到 Observable 作为参数提供给 notificationHandler 功能。如果该 Observable 调用 onComplete 或 onError 然后重试 将在子订阅上调用 onCompleted 或 onError。否则, 这个 Observable 将重新订阅源 Observable。

当count1值达到3时,会调用subscriber.onCompleted();在我的选项中,在调用subscriber.onCompleted()之后会调用子订阅者的onComplete()方法,并且“child_onCompleted”会出现在输出中,但事实并非如此;为什么会这样?

  count1 = 0;  

      Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                System.out.println("call_onErr ");
                subscriber.onError(new Throwable("gg!"));
            }
        }).retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
            @Override
            public Observable<?> call(Observable<? extends Throwable> observable) {

                System.out.println("fun_call+" + observable);
                return observable.flatMap(new Func1<Throwable, Observable<?>>() {
                    @Override
                    public Observable<?> call(Throwable throwable) {

                        if (count1 < 3) {
                            count1 = count1 + 1;
                            return Observable.create(new Observable.OnSubscribe<Integer>() {
                                @Override
                                public void call(Subscriber<? super Integer> subscriber) {
                                    System.out.println("fun_call_onNext " + "  count=" + count1);
                                    subscriber.onNext(1000);
                                }
                            });
                        } else
                            return Observable.create(new Observable.OnSubscribe<Integer>() {
                                @Override
                                public void call(Subscriber<? super Integer> subscriber) {
                                    System.out.println("fun_call_onCompleted " + "   " + count1);
                                    subscriber.onCompleted();//this is the subscriber!!!!!!!
                                }
                            });
                    }
                });


            }
        }).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                System.out.println("child_onCompleted ");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("child_err ");

            }

            @Override
            public void onNext(Integer integer) {
                System.out.println("child_onNext " + integer + " ");
            }
        });

输出是:

System.out﹕ fun_call+rx.Observable@3ecb7134
System.out﹕ call_onErr
System.out﹕ fun_call_onNext   count=1
System.out﹕ call_onErr
System.out﹕ fun_call_onNext   count=2
System.out﹕ call_onErr
System.out﹕ fun_call_onNext   count=3
System.out﹕ call_onErr
System.out﹕ fun_call_onCompleted    3

【问题讨论】:

    标签: rx-java


    【解决方案1】:

    问题是你有 flatMap,如果你给它发送一个空的 Observable,它就不会完成。相反,您可以在其中使用特殊值并与 takeWhile 运算符通信以触发完成:

    Observable.error(new Throwable("gg!"))
    .retryWhen(o -> {
        AtomicInteger counter = new AtomicInteger();
        return o.flatMap(e -> {
            if (counter.getAndIncrement() < 3) {
                return Observable.just(0);
            }
            return Observable.just(1);
        })
        .takeWhile(v -> v == 0);
    })
    .subscribe(...);
    

    【讨论】:

    • 感谢您的回答。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-12-24
    • 1970-01-01
    • 2015-06-10
    • 1970-01-01
    • 2016-03-01
    • 2020-08-08
    • 1970-01-01
    相关资源
    最近更新 更多