【问题标题】:RxJava error handling in chain API calls链式 API 调用中的 RxJava 错误处理
【发布时间】:2018-04-04 05:02:21
【问题描述】:

我最近在使用 Rxjava 尝试实现一系列事件(Api callas/数据库操作),并且在处理错误时似乎遇到了障碍。

这就是我想要做的。我正在调用一个 Api,它将检查用户是否存在于数据库中。根据我得到的响应,我正在尝试使用 rxjava 链接一些序列。下图可能解释得更好一些。

                          checkUser()
                         /          \
                       No           Yes
                       /              \
            createUserRemote()       FetchUserNotesRemote()
                      |                    |
                    End               SaveUserNotesLocal()
                                            |
                                           End

我可以将 checkUser() -> FetchUserNotesRemote() -> SaveUserNotesLocal() 序列与以下代码链接在一起。

checkUser()
            .flatMap(id -> {return fetchData(id);})
            .flatMap(notesResponseObject -> {return saveFetchedData(notesResponseObject);})
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new SingleObserver<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onSuccess(Integer integer) {
                    //handle onsuccess here
                }

                @Override
                public void onError(Throwable e) {
                    //handle errors here
                }
            });

我主要想解决的问题。

  • 我不知道如何处理 checkUser() 返回的情况
    404 http 状态。因为当这种情况发生时,订阅者的 onError
    方法被调用,在我看来这是应该发生的。我怎样才能 处理它,以便当我从 API 收到错误 (404) 响应时, 而不是执行 FetchUserNotesRemote() 和 SaveUserNotesLocal(), 我执行不同的事件链?
  • 我不确定的另一件事是,是否调用了错误 链中的任何可观察对象,订阅者的 onError 方法如何知道 哪个 observable 调用它?

【问题讨论】:

    标签: java android rx-java2


    【解决方案1】:

    1) 要在出错时执行不同的 observables 链,您可以使用方法 onErorrResumeNext()。更多信息在这里:github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators

    例子:

    checkUser().flatMap(id -> {return fetchData(id);})
               .flatMap(notesResponseObject -> {return saveFetchedData(notesResponseObject);})
               .onErrorResumeNext(throwable -> { return doSomethingDifferent(); }
               .subscribeOn(Schedulers.io())
               .observeOn(AndroidSchedulers.mainThread())
               .subscribe(new SingleObserver<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
    
                    }
    
                    @Override
                    public void onSuccess(Integer integer) {
                        //handle onsuccess here
                    }
    
                    @Override
                    public void onError(Throwable e) {
                        //handle errors here
                    }
                });
    

    2) 如果在您的流中某处抛出异常,则将其传递给订阅者onError()。如果您想知道在哪个部分引发了流错误,您可以添加多个 onErorrResumeNext() 调用,在每个 api 调用之后抛出具体的异常。

        checkUser()
               .onErrorResumeNext(throwable -> { return Observable.error(new CheckUserException()); }
               .flatMap(id -> {return fetchData(id);})
               .onErrorResumeNext(throwable -> { return Observable.error(new FetchDataException()); }
               .flatMap(notesResponseObject -> {return saveFetchedData(notesResponseObject);})
               .onErrorResumeNext(throwable -> { return Observable.error(new SaveDataException()); }
               .subscribeOn(Schedulers.io())
               .observeOn(AndroidSchedulers.mainThread())
               .subscribe(new SingleObserver<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
    
                    }
    
                    @Override
                    public void onSuccess(Integer integer) {
                        //handle onsuccess here
                    }
    
                    @Override
                    public void onError(Throwable e) {
                        //handle errors here
                    }
                });
    

    【讨论】:

    • 感谢您的回复。我会试试这个,让你知道。
    • 嗨@noob 你设法让它工作了吗?我也面临类似的情况,当我的 API 超时时,onErrorResumeNext 方法没有被调用,你能发布解决方案吗?
    • @Vie 我发布了答案。
    • @Subayyal onErrorResumeNext() 在我将它添加到链请求中时不会被调用。
    【解决方案2】:

    我完全忘记了这一点。但是@mol 把我推向了正确的方向。我的解决方案有点不同。这可能不是最好的解决方案,但它当时对我有用。

    我首先创建了自己的自定义异常类,如下所示。

    public class CreateUserLocalException extends Exception {
        public CreateUserLocalException(String message) {
            super(message);
        }
    }
    

    然后在我的 checkUser() 函数中,我抛出了我在上面创建的类型的异常,如下所示。

    public Single<String> checkUser(String id) {
        return Single.create(new SingleOnSubscribe<String>() {
            @Override
            public void subscribe(SingleEmitter<String> emitter) throws Exception {
                try {
                    GetUserResponseObject getUserResponseObject = apiClient.usersIdGet(id);
                    Log.d("Test", "checkUserCall: Status: " + getUserResponseObject.getStatus());
                    emitter.onSuccess(getUserResponseObject.getBody().getUserId());
                } catch (AmazonServiceException e) {
                    Log.d("Test", "AmazonServiceException : " + e.getErrorMessage());
                    e.printStackTrace();
                    if (e.getErrorMessage().equals("timeout")) {
                        throw new SocketTimeoutException();
                    } else {
                        throw new CheckUserException(Integer.toString(e.getStatusCode()));
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    throw new CheckUserException(Integer.toString(AppConstants.ERROR));
                }
            }
        });
    }
    

    然后在我的调用链中,如果发生错误,onError(throwable) 会被调用,我正在检查 instanceof Exception 以确定发生了哪种异常。下面是函数链的代码。

    cloudSyncHelper.checkUser(user.getUser_id())
            .retry(3, new Predicate<Throwable>() {
                @Override
                public boolean test(Throwable throwable) throws Exception {
                    Log.d("Test", throwable.toString());
                    if (throwable instanceof SocketTimeoutException) {
                        Log.d("Test", "Time out.. Retrying..");
                        return true;
                    }
                    return false;
                }
            })
            .flatMap(s -> {
                return cloudSyncHelper.createUserLocal(user)
                        .onErrorResumeNext(throwable -> {
                            Log.d("Test", "onErrorResumeNext, throwable message: " + throwable.getMessage());
                            if (throwable instanceof CreateUserLocalException) {
                                if (Integer.parseInt(throwable.getMessage()) == AppConstants.LOCAL_DB_DUPLICATE) {
                                    return Single.just(user.getUser_id());
                                }
                            }
                            return Single.error(new CreateUserLocalException(Integer.toString(AppConstants.LOCAL_DB_ERROR)));
                        });
            })
            .flatMap(id -> {
                return cloudSyncHelper.fetchData(id)
                        .retry(3, new Predicate<Throwable>() {
                            @Override
                            public boolean test(Throwable throwable) throws Exception {
                                Log.d("Test", throwable.toString());
                                if (throwable instanceof SocketTimeoutException) {
                                    Log.d("Test", "Time out.. Retrying..");
                                    return true;
                                }
                                return false;
                            }
                        });
            })
            .flatMap(notesResponseObject -> {
                return cloudSyncHelper.saveFetchedData(notesResponseObject);
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new SingleObserver<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                }
    
                @Override
                public void onSuccess(Integer integer) {
                    //handle onsuccess here
                    googleSignInButton.setEnabled(true);
                    progressBar.setVisibility(View.GONE);
                    Log.d("Test", "onSuccess Called");
                    getSharedPreferences(AppConstants.AppName, MODE_PRIVATE).edit().putBoolean("isFirstRun", false).apply();
                    startActivity(new Intent(LoginScreen.this, HomeScreen.class));
                }
    
                @Override
                public void onError(Throwable e) {
    
                    if (e instanceof SocketTimeoutException) {
                        googleSignInButton.setEnabled(true);
                        progressBar.setVisibility(View.GONE);
                        Log.d("Test", "Socket Time Out");
                        Utils.createToast(LoginScreen.this, "Socket timed out");
                        return;
                    }
    
                    int code = Integer.parseInt(e.getMessage());
                    Log.d("Test", "onError Called");
                    if (e instanceof CheckUserException) {
                        Log.d("Test", "onError CheckUserException");
                        if (code == AppConstants.NOTFOUND) {
                            newUserSequence(user);
                        } else {
                            googleSignInButton.setEnabled(true);
                            progressBar.setVisibility(View.GONE);
                            Utils.createToast(LoginScreen.this, "Unable to user information from cloud. Try again.");
                        }
                    }
                    if (e instanceof CreateUserLocalException) {
                        Log.d("Test", "onError CreateUserLocalException");
                        googleSignInButton.setEnabled(true);
                        progressBar.setVisibility(View.GONE);
                    }
                    if (e instanceof FetchDataException) {
                        Log.d("Test", "onError FetchDataException");
                        if (code == AppConstants.NOTFOUND) {
                            googleSignInButton.setEnabled(true);
                            progressBar.setVisibility(View.GONE);
                            getSharedPreferences(AppConstants.AppName, MODE_PRIVATE).edit().putBoolean("isFirstRun", false).apply();
                            startActivity(new Intent(LoginScreen.this, HomeScreen.class));
                        } else {
                            googleSignInButton.setEnabled(true);
                            progressBar.setVisibility(View.GONE);
                            Log.d("Test", "Unable to fetch data from cloud");
                            Utils.createToast(LoginScreen.this, "Unable to fetch data from cloud. Try again.");
                        }
                    }
                    if (e instanceof SaveDataLocalException) {
                        googleSignInButton.setEnabled(true);
                        progressBar.setVisibility(View.GONE);
                        Log.d("Test", "onError SaveDataLocalException");
                        if (code == AppConstants.LOCAL_DB_ERROR) {
                            Log.d("Test", "Unable to save data fetched from cloud");
                            Utils.createToast(LoginScreen.this, "Unable to save data fetched from cloud");
                        } else {
                            Utils.createToast(LoginScreen.this, "Unable to save data fetched from cloud");
                        }
                    }
                }
            });
    

    希望这会有所帮助。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-12-29
      • 1970-01-01
      • 2017-01-20
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-01-01
      • 2019-02-27
      相关资源
      最近更新 更多