【问题标题】:Observable merge() detect which observable is triggeredObservable merge() 检测触发了哪个 observable
【发布时间】:2023-04-02 10:55:01
【问题描述】:

我正在使用值列表创建Observable 列表,foreach 值自定义Observable。我使用合并运行它们,但我无法检测到哪个触发了onNext()onError()

就像下面的代码:

 List<Observable<MyHttpRsObj>> observables = new ArrayList<>();

    for (String param : paramsList) {
        Observable<MyHttpRsObj> objObservable = MyRestClient.get().doHttpRequest(param);
        observables.add(fileUploadObservable);
    }

    Observable<BaseRs> combinedObservables = Observable.merge(observables);

    combinedObservables.observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(Schedulers.io())
            .subscribe(new Subscriber<MyHttpRsObj>() {
                @Override
                public void onCompleted() {
                    //called only once when all Observables finished
                }

                @Override
                public void onError(Throwable throwable) {
                    //how to know which Observable has error (which param)
                }


                @Override
                public void onNext(MyHttpRsObj myHttpRsObj) {
                    //how to know which Observable has sccess  (which param)
                }
            });

【问题讨论】:

  • 在每个可观察到的形式 doHttpRequest 中,doOnError 获取异常并用您的自定义异常包装它,该异常内部也有一个 param 。至于成功,你可以做同样的伎俩,但使用 map 代替并返回 MyHttpRsObjparam 在里面。

标签: java android rx-java observable


【解决方案1】:

不可能知道哪个可观察对象触发了错误,因为您将所有可观察对象合并为一个。

最好的办法是为每个可观察对象使用一个观察者。最后一个用于合并的 Observable。

像这样:

 List<Observable<MyHttpRsObj>> observables = new ArrayList<>();

    for (String param : paramsList) {
        //change to connectable Observable
        ConnectableObservable<MyHttpRsObj> objObservable = MyRestClient.get()
                 .doHttpRequest(param)
                 .publish();

       //don't forget to connect
        observable.connect();
        observables.add(fileUploadObservable);

        //subscribe for each observable
        objObservable.observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.io())
                .subscribe(new Subscriber<MyHttpRsObj>() {
                    @Override
                    public void onCompleted() {
                        //just partial completed
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        //you can access param from here

                    }


                    @Override
                    public void onNext(MyHttpRsObj myHttpRsObj) {
                        //access onNext here
                        //you can access param from here
                    }
                });
    }

    Observable<BaseRs> combinedObservables = Observable.merge(observables);

    combinedObservables.observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(Schedulers.io())
            .subscribe(new Subscriber<MyHttpRsObj>() {
                @Override
                public void onCompleted() {
                    //called only once when all Observables finished
                }

                @Override
                public void onError(Throwable throwable) {
                    //don't handle error here
                }


                @Override
                public void onNext(MyHttpRsObj myHttpRsObj) {

                }
            });

PS:使用ConnectableObservable 避免发射两次

【讨论】:

  • NO,这个方法每个请求只执行2次! merge()这里没有理由。
猜你喜欢
  • 2018-08-03
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-07-22
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多