【问题标题】:How to pass the item indexes in Observable.fromIterable to onNext in subscribe method?如何将 Observable.fromIterable 中的项目索引传递给 subscribe 方法中的 onNext?
【发布时间】:2020-05-27 00:41:59
【问题描述】:

我正在尝试使用 RxJava2 加载数据并将其放入 SparseArray。我通过从数组中调用 URL 来获取数据,但是我需要将响应解析并按照数组中 URL 的顺序插入到 SparseArray 中,因此我需要在 mUrls.getGroups() 中传递 String 项的索引

提前致谢!

@GET
Single<ResponseBody> getChannels(@Url String url);


groups = new SparseArray<>();


Observable.fromIterable(mUrls.getGroups())
    .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        //
        // How can I access the index of the String item in the array?
        //
        .subscribe(new Observer<ResponseBody>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(ResponseBody responseBody) {
                Group group = GroupParser.parseList(responseBody.byteStream(), index);


                groups.put(index, group);
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, e.getMessage());
            }

            @Override
            public void onComplete() {

            }
 });

编辑:

这是实施的解决方案:

 Observable.defer(() -> {
                        AtomicInteger counter = new AtomicInteger();
                        return Observable.fromIterable(mUrls.getGroups())
                                .map(url -> new Pair(url, counter.getAndIncrement()));
                    }).flatMapSingle(pair ->
                            aPI.getChannels(pair.first.toString())
                                    .map(responseBody -> new Pair(responseBody, pair.second))
                                    .subscribeOn(Schedulers.io())
                    )
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Observer<Pair>() {
                        @Override
                        public void onSubscribe(Disposable d) {

                        }

                        @Override
                        public void onNext(Pair pair) {
                            Pair<ResponseBody, Integer> resultPair =  (Pair<ResponseBody, Integer>) pair;
                            Group group = GroupParser.parseList(resultPair.first.byteStream(),
                                    resultPair.second);

                            groups.put(resultPair.second, group);
                        }

                        @Override
                        public void onError(Throwable e) {
                            Log.e(TAG, "***** message: " + e.getMessage());
                        }

                        @Override
                        public void onComplete() {
                            Log.i(TAG, "***** onComplete.");
                        }
                    });

【问题讨论】:

  • 你想做什么?
  • 我正在尝试在 onNext 方法中获取字符串项的索引。目前我只能访问 responseBody。

标签: java android retrofit2 reactive-programming rx-java2


【解决方案1】:

如果您按顺序处理 URL,则只需在 Observer 中引入一个 index 字段:

Observable.fromIterable(mUrls.getGroups())
    .concatMapSingle(url -> getChannels(url))
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<ResponseBody>() {

         int index;  // <----------------------------------------------------

         // ...

         @Override
         public void onNext(ResponseBody responseBody) {
             Group group = GroupParser.parseList(responseBody.byteStream(), index);


             groups.put(index, group);

             index++;  // <---------------------------------------------------------
         }

         // ...
    });

但是,如果您同时处理 URL,则必须将每个 URL 与索引配对并标记它。例如,给定一个Pair 类:

 Observable.defer(() -> {
     AtomicInteger counter = new AtomicInteger();
     return Observable.fromIterable(mUrls.getGroups())
     .map(url -> Pair.of(url, counter.getAndIncremenet()));
 })
 .flatMapSingle(urlIndex -> 
     getChannels(urlIndex.first)
     .map(v -> Pair.of(v, urlIndex.second))
     .subscribeOn(Schedulers.io())
 )
 .observeOn(AndroidSchedulers.mainThread())
 .subscribe(new Observer<Pair<ResponseBody, Integer>>() {

         // ...

         @Override
         public void onNext(Pair<ResponseBody, Integer> pair) {
             Group group = GroupParser.parseList(pair.first.byteStream(), pair.second);


             groups.put(pair.second, group);
         }

         // ...
    });

【讨论】:

  • 非常感谢@akarnokd。实施后我会接受你的回答。
  • 我实现了第二个解决方案,并进行了一些修改。修改后的解决方案将添加到问题的编辑部分。再次感谢您。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2016-12-02
  • 1970-01-01
  • 2017-08-12
  • 1970-01-01
  • 1970-01-01
  • 2020-01-22
  • 2020-06-20
相关资源
最近更新 更多