【问题标题】:RxJava: OnErrorFailedException. Identifying the correct causeRxJava:OnErrorFailedException。确定正确的原因
【发布时间】:2017-10-29 05:26:29
【问题描述】:

受到 T.Nurkiewicz 的“Reactive Programming with RxJava”的启发,我尝试将它应用到我正在从事的项目中,这就是我面临的问题。

我有一个 Rest 端点,它接受输入流和用户名,并返回更新后的用户名的链接或返回错误请求错误。以下是我尝试使用 RxJava 实现它的方法:

    @PUT
    @Path("{username}")
    public Response updateCredential(@PathParam("username") final String username, InputStream stream) {
        CredentialCandidate candidate = new CredentialCandidate();
        Observable.just(repository.getByUsername(username))
                .subscribe(
                    credential -> {
                            serializeCandidate(candidate, stream);
                            try {
                                repository.updateCredential(build(credential, candidate));
                            } catch (Exception e) {
                                String msg = "Failed to update credential +\""+username+"\": "+e.getMessage();
                                throw new BadRequestException(msg, Response.status(Response.Status.BAD_REQUEST).build());
                            }
                        },
                        ex -> {
                            String msg = "Couldn't update credential \""+username+"\""
                            + ". A credential with such username doesn't exist: " + ex.getMessage();
                            logger.error(msg);
                            throw new BadRequestException(msg, Response.status(Response.Status.BAD_REQUEST).build());
                });//if the Observable completes without exceptions we have a success case
        Map<String, String> map = new HashMap<>();
        map.put("path", "credential/" + username);
        return Response.ok(getJsonRepr("link", uriGenerator.apply(appsUriBuilder, map).toASCIIString())).build();
}

我的问题在第 11 行(onNext 方法的 catch 子句)。这是可以快速演示发生了什么的日志输出:

19:23:50.472 [http-listener(4)] ERROR com.vgorcinschi.rimmanew.rest.services.CredentialResourceService             - Couldn't update credential "admin". A credential with such username doesn't exist: Failed to update credential +"admin": Password too weak! 

所以onNext 方法中抛出的异常会进入上游并最终在onError 方法中!显然this works as designed,但我对如何返回错误请求错误的正确原因感到困惑。毕竟在我的测试用例中,存储库找到了用户的凭据,正确的错误是建议的密码太弱。这是产生错误的辅助方法:

private Credential build(Credential credential, CredentialCandidate candidate) {
         if(!isOkPsswd.test(candidate.getPassword())){
            throw new BadRequestException("Password too weak!", Response.status(Response.Status.BAD_REQUEST).build());
        }
...
}

我对响应式编程还很陌生,所以我意识到我可能遗漏了一些显而易见的东西。浏览这本书并没有让我得到答案,因此我将不胜感激。

以防万一,这是完整的堆栈跟踪:

updateCredentialTest(com.vgorcinschi.rimmanew.services.CredentialResourceServiceTest)  Time elapsed: 0.798 sec  <<< ERROR!
rx.exceptions.OnErrorFailedException: Error occurred when trying to propagate error to Observer.onError
    at com.vgorcinschi.rimmanew.rest.services.CredentialResourceService.lambda$updateCredential$9(CredentialResourceService.java:245)
    at rx.internal.util.ActionSubscriber.onNext(ActionSubscriber.java:39)
    at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134)
    at rx.internal.util.ScalarSynchronousObservable$WeakSingleProducer.request(ScalarSynchronousObservable.java:276)
    at rx.Subscriber.setProducer(Subscriber.java:209)
    at rx.Subscriber.setProducer(Subscriber.java:205)
    at rx.internal.util.ScalarSynchronousObservable$JustOnSubscribe.call(ScalarSynchronousObservable.java:138)
    at rx.internal.util.ScalarSynchronousObservable$JustOnSubscribe.call(ScalarSynchronousObservable.java:129)
    at rx.Observable.subscribe(Observable.java:10238)
    at rx.Observable.subscribe(Observable.java:10205)
    at rx.Observable.subscribe(Observable.java:10045)
    at com.vgorcinschi.rimmanew.rest.services.CredentialResourceService.updateCredential(CredentialResourceService.java:238)
    at com.vgorcinschi.rimmanew.services.CredentialResourceServiceTest.updateCredentialTest(CredentialResourceServiceTest.java:140)

【问题讨论】:

    标签: java functional-programming rx-java reactive-programming


    【解决方案1】:

    看来你没有正确掌握响应式编程原理。

    第一件事是 Observable 通过他们的 API 是异步的,而您试图通过尝试直接从方法返回 Response 值而不是返回发出的 Observable&lt;Response&gt; 来强制它是同步 API这个Response 值随着时间的推移通过其onNext() 通知。
    这就是为什么您要与异常作斗争的原因,每个通知 lambda 方法(onNext/onError)都被 Observable 机制封装,以便创建一个遵守某些规则(Observable contract)的正确流,一些在这些预期的行为中,错误应该被重定向到onError() 方法,这是异常捕获方法,你不应该在那里抛出,抛出那里将被视为致命错误,并会被抛出OnErrorFailedException 吞没。

    理想情况下是这样的:

    public Observable<Response> updateCredential(@PathParam("username") final String username,
                                                 InputStream stream) {
        rerurn Observable.fromCallable(() -> {
            CredentialCandidate candidate = new CredentialCandidate();
            Credential credential = repository.getByUsername(username);
            serializeCandidate(candidate, stream);
            repository.updateCredential(build(credential, candidate));
            Map<String, String> map = new HashMap<>();
            map.put("path", "credential/" + username);
            return Response.ok(getJsonRepr("link", uriGenerator.apply(appsUriBuilder, map).toASCIIString())).build();
        })
                .onErrorReturn(throwable -> {
                    String msg = "Failed to update credential +\"" + username + "\": " + e.getMessage();
                    throw new BadRequestException(msg, Response.status(Response.Status.BAD_REQUEST).build());
                });
    }
    

    使用fromCallable 以便在订阅时使请求发生(而Observable.just(repository.getByUsername(username)) 将在Observable 构造时同步执行),成功路径是callable 本身,而如果发生任何错误,您将对其进行转换使用onErrorReturn 运算符来处理您的自定义异常。

    使用他的方法,您将返回 Observable 对象,该对象将在您订阅它时起作用,您将获得 Observable 和反应式方法的所有好处,例如能够与其他一些操作组合,能够从外部指定它是同步(当前线程)还是在其他线程上异步(使用Scheduler)。

    有关反应式编程的更详细说明,我建议从 André Staltz 的这个伟大的 tutorial 开始。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-04-04
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-10-07
      • 2016-04-12
      • 1970-01-01
      相关资源
      最近更新 更多