【问题标题】:Retrofit + RxJava Android onNext never called but onComplete isRetrofit + RxJava Android onNext 从未调用,但 onComplete 是
【发布时间】:2018-09-09 01:38:48
【问题描述】:

我一直在为我的 Android 应用程序的 REST API 调用使用 Retrofit,并且效果很好。但是,我遇到了有关失去连接和请求失败的问题。我的应用程序涉及以编程方式连接到 WiFi 网络,然后在连接后发出 API 调用以进行初始化。然而,有时这些连接很慢,我需要一种方法来在它们失败时重新发送这些请求。这就是我转向 RxJava 的地方,因为 Retrofit 不支持这个。我想要做的是,一旦连接到新的 WiFi 网络,就会向 REST API 发出 ping,如果出现网络错误,请等待一秒钟并重试 ping。这是我的代码:

接口中的函数声明:

@Headers("Content-Type: application/json")
@GET("something")
io.reactivex.Observable<String> ping();

Activity 中的改造声明

Retrofit retrofit = new Retrofit.Builder()
            .baseUrl(Data.cloudEndpoint)
            .addConverterFactory(GsonConverterFactory.create())
            .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
            .build();

avfsService = retrofit.create(MyClass.class);

尝试在 Activity 中执行 API 调用

service.ping()
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
                Log.d(TAG, "retryWhen");
                return Observable.timer(1, TimeUnit.SECONDS);
            }
        })
        .blockingSubscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscibe " + d.toString());
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext " + s);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError");
                e.printStackTrace();
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
                launch();
            }
        });

所以这里执行的时候是我在 Logcat 中看到的输出

onSubscibe null
retryWhen
onComplete

所以永远不会调用 onNext,我不太清楚为什么。我认为每次从 Observable 发出数据时都会调用 onNext,并且我认为通过创建 Observable 并订阅它,API 调用将被执行,并且响应将由 Observable 发出。情况似乎并非如此,因为我现在什至无法重新创建网络错误,并且会立即调用 onComplete。

我在这里使用blockingSubscribe,因为我看到其他人说在io线程上执行并在主线程上观察会有所帮助,但没有运气。当我只使用 .subscribe() 时,我看到了相同的行为。

我想我遗漏了一些东西,但不明白我的 API 调用是否没有被执行,或者我只是没有得到正确的响应。

非常感谢任何帮助。

【问题讨论】:

    标签: android rest retrofit rx-java2


    【解决方案1】:

    您的困惑可能来自retryWhen 运算符仅调用您的重试逻辑函数一次。因为它传递给你一个ObservableThrowables,它希望你在你得到一个从throwableObservable 发出的新error 时发出你的重试可观察对象。最简单的方法是 flatmap 将错误添加到您的重试逻辑中 --

    .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
                        Log.d(TAG, "retryWhen");
                        return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                            @Override
                            public ObservableSource<?> apply(Throwable throwable) throws Exception {
                                Log.d(TAG, "retrying");
                                return Observable.timer(1, TimeUnit.SECONDS);
                            }
                        });
                    }
                })
    

    如果您运行上述程序,您仍然会看到“retryWhen”被调用一次,然后在出现错误时每秒“重试”一次,直到您收到onNextonComplete。您可能希望添加代码以在一段时间后放弃,或者至少确保正确处理订阅,使其不会永远运行。

    附带说明一下,这段代码(以及一般的 RxJava)使用 lambdas 更具可读性。

    如果您仍然想了解retryWhen,请查看 Dan Lew 的 this article 有一个很好的解释。

    【讨论】:

    • 嘿,感谢您的回复,当我回到工作岗位时,我将不得不尝试一下。我仍然对何时进行实际 API 调用感到困惑。是在创建 Observable 的时候吗?或者是当观察者订阅该观察者时?还是其他时候?
    • @OhWhen 订阅Observable时进行API调用。
    【解决方案2】:

    试试这个:

    service.ping()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
                @Override
                public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
                    Log.d(TAG, "retryWhen");
                    return Observable.timer(1, TimeUnit.SECONDS);
                }
            })
            .subscribe(new Subscriber<String>() {    
                @Override
                public void onNext(String s) {
                    Log.d(TAG, "onNext " + s);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "onError");
                    e.printStackTrace();
                }
    
                @Override
               public void onCompleted() {
                    Log.d(TAG, "onComplete");
                    launch();
               }
        });
    

    【讨论】:

      【解决方案3】:

      您的问题源于您使用的是blockingSubscribe。这会阻塞线程,直到 observable 完成。

      由于您告诉它在 android 线程上进行观察,并且这可能是被阻塞的线程,它永远没有机会发送这些 onNext 调用,除非已调用 onComplete,这使它忽略任何进一步的onNext 电话。

      您给定的代码可能不会死锁,因为如果没有明确告知要保持顺序,observeOn 保留优先处理 onCompleteonError 调用的权利。这意味着它允许onComplete 调用通过,从而解除线程阻塞。

      【讨论】:

        猜你喜欢
        • 2019-12-22
        • 1970-01-01
        • 1970-01-01
        • 2020-02-15
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2018-02-20
        相关资源
        最近更新 更多