【问题标题】:Combine Observables to run in parallel结合 Observables 并行运行
【发布时间】:2021-04-22 17:35:45
【问题描述】:

如何将两者结合起来,使其并行或串行运行?

Observable<Data>) dataRequestOne;
Observable<Data>) dataRequestTwo;

【问题讨论】:

    标签: android optimization rx-java observable


    【解决方案1】:

    平行:

    Observable.merge(dataRequestOne, dataRequestTwo)
    

    序列号:

    Observable.concat(dataRequestOne, dataRequestTwo);
    

    【讨论】:

    • merge 仅在可观察对象位于不同线程上时才能并行工作。
    【解决方案2】:

    试试 zip 运算符:http://reactivex.io/documentation/operators/zip.html

    Observable.zip(dataRequestOne, dataRequestTwo, new Func2<Data, Data, Object>() {
            @Override
            public Object call(Data o, Data o2) {
                return null;
            }
        });
    

    【讨论】:

      【解决方案3】:

      如果您想在 onNext 中对每个请求使用串行响应,则需要使用合并运算符

      http://reactivex.io/documentation/operators/merge.html

      Observable.mergeDelayError(
                      dataRequestOne.subscribeOn(Schedulers.newThread()),
                      dataRequestTwo.subscribeOn(Schedulers.newThread())
              )
      

      【讨论】:

        【解决方案4】:

        要实现这一点,您可以这样做:

        Observable.just(dataRequestOne, dataRequestTwo).flatMap(new Func1<Data, Observable<Data>>() {
                @Override
                public Observable<Data> call(Data data) {
                    return Observable.just(data)
                                     .subscribeOn(Schedulers.io);
                }
            });
        

        现在每个请求都将在单独的线程上处理。 FlatMap 还有一个可选的 maxConcurrent 参数,所以如果你有更多的 observable,你可以在必要时控制处理请求的线程数量。

        【讨论】:

          【解决方案5】:

          使用merge一次并行运行2个observables的示例

          val parallelObservables = mutableListOf<Single<String>>()
          val executor = Executors.newFixedThreadPool(2)
          val schedulers = Schedulers.from(executor)
          for (i in 0..4) {
              val observable = Single.create<String> {
                  Thread.sleep(2000)
                  it.onSuccess("" + i + " " + Thread.currentThread().name)
              }.subscribeOn(schedulers) // need to subscribe here
              parallelObservables.add(observable)
          }
          
          Log.i("TAG", "start")
          Single.merge(parallelObservables)
              .subscribe {
                  Log.i("TAG", "complete $it")
              }
          }
          

          Output

          2021-04-22 16:31:51.790 32016-32016/com.example.androidrxtest I/TAG: start
          2021-04-22 16:31:53.891 32016-32055/com.example.androidrxtest I/TAG: complete 0 pool-1-thread-1
          2021-04-22 16:31:53.894 32016-32056/com.example.androidrxtest I/TAG: complete 1 pool-1-thread-2
          2021-04-22 16:31:55.894 32016-32055/com.example.androidrxtest I/TAG: complete 2 pool-1-thread-1
          2021-04-22 16:31:55.897 32016-32056/com.example.androidrxtest I/TAG: complete 3 pool-1-thread-2
          2021-04-22 16:31:57.896 32016-32055/com.example.androidrxtest I/TAG: complete 4 pool-1-thread-1
          

          【讨论】:

            猜你喜欢
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 2019-08-04
            • 2017-09-07
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 2014-12-02
            相关资源
            最近更新 更多