【问题标题】:Refactoring RxJava Observable concat code and error handling in functional way以函数式方式重构 RxJava Observable concat 代码和错误处理
【发布时间】:2018-06-28 22:05:52
【问题描述】:

如何改进下面的代码并在这里更好地进行错误处理,以便代码在本质上更具功能性。

final Observable<A> aResponse =a.update(52, 33, "759", obj);

final Observable<B> bResponse =b.fetch(52, 759);
Map map1 = new HashMap();
Map map2 = new HashMap();

两个返回 observables 的 api 调用。结果连接在 map1 和 map2 下方。


.concat我这里是用concat来合并两个api的结果。我如何在此处使用 .zip() 或任何其他功能。可以在这里使用 flatMap() 吗?

.onErrorResumeNext 用于在响应抛出错误时继续执行 bResponse。如果 aResponse 失败,如何更好地记录错误。我应该在 onErrorResumeNext 中执行此操作吗?

.subscribe 用于观察者可以看到由两个可观察对象的串联发出的项目。
onError 如果 bResponse 失败,我正在使用 onError 记录错误。我怎样才能做得更好?

onNext 在 onNext 中,如果它是 instanceOf B,我将 Object 类型转换为 B,然后填充 map1 和 map2 中的项目。我怎样才能以更好的方式解决这个问题?

Observable.concat(aResponse, bResponse)
            .onErrorResumeNext(new Func1<Throwable, Observable<B>>() {
                @Override
                public Observable<B> call(Throwable throwable) {
                    return bResponse;
                }
            })
            .subscribe(new Subscriber<Object>() {
                @Override
                public void onCompleted() {

                 System.out.println("The response list after the fututres");
               }
                @Override
                public void onError(Throwable e) {

                    System.out.println("ERROR IN BRESPONSE");
                    e.printStackTrace();
                }

                @Override
                public void onNext(Object o) {
                    if(o instanceof B)
                    {
                        ((B)o).getSomething().stream().forEach( s ->  {
                                   map1.put(s.getId(),s.getNumber());
                                   map2.put(s.getId(), s.getList());
                                });
                    }

                }
            });
}

【问题讨论】:

    标签: java java-8 rx-java observable reactive-programming


    【解决方案1】:

    如果aResponse需要在你开始观察bResponse之前完成,最好不要将观察者链合并到Observable&lt;Object&gt;中。

    aResponse
      .doOnError( error -> log.error( "Error on A stream", error )
      .onErrorResumeNext( Observable.empty() )
      .toCompletable()
      .andThen( bResponse )
      .subscribe( ... );
    

    本质上,上面的链监视aResponse,直到它完成,忽略所有元素。当aResponse 完成后,你将开始观察bResponse,在B 元素上做你需要做的事情。

    如果你需要对A元素做一些事情,你可以在toCompletable()之前添加doOnNext()到观察者链中。

    【讨论】:

      猜你喜欢
      • 2016-07-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-09-30
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-10-04
      相关资源
      最近更新 更多