【问题标题】:RxJava and parallel execution of observer codeRxJava 和观察者代码的并行执行
【发布时间】:2016-02-16 07:00:39
【问题描述】:

我使用 RxJava Observable api 有以下代码:

Observable<Info> observable = fileProcessor.processFileObservable(processedFile.getAbsolutePath());
    observable
      .buffer(10000)
      .observeOn(Schedulers.computation())
      .subscribe(recordInfo -> {
        _logger.info("Running stage2 on thread with id : " + Thread.currentThread().getId());
          for(Info info : recordInfo) {
            // some I/O operation logic
         }
      }, 
      exception -> {
      }, 
      () -> {
      });

我的期望是观察代码,即 subscribe() 方法中的代码将在我指定计算调度程序后并行执行。相反,代码仍在单线程上按顺序执行。如何使用 RxJava api 让代码并行运行。

【问题讨论】:

    标签: java system.reactive rx-java


    【解决方案1】:

    当谈到它的异步/多线程方面时,RxJava 经常被误解。多线程操作的编码很简单,但理解抽象是另一回事。

    关于 RxJava 的一个常见问题是如何实现并行化,或者从 Observable 中同时发出多个项目。当然,这个定义打破了 Observable Contract,它规定 onNext() 必须按顺序调用,并且一次不能由多个线程同时调用。

    要实现并行性,您需要多个 Observable。

    这在一个线程中运行:

    Observable<Integer> vals = Observable.range(1,10);
    
    vals.subscribeOn(Schedulers.computation())
              .map(i -> intenseCalculation(i))
              .subscribe(val -> System.out.println("Subscriber received "
                      + val + " on "
                      + Thread.currentThread().getName()));
    

    这在多个线程中运行:

    Observable<Integer> vals = Observable.range(1,10);
    
    vals.flatMap(val -> Observable.just(val)
                .subscribeOn(Schedulers.computation())
                .map(i -> intenseCalculation(i))
    ).subscribe(val -> System.out.println(val));
    

    代码和文字comes from this blog post.

    【讨论】:

      【解决方案2】:

      RxJava 2.0.5 引入了parallel flowsParallelFlowable,这使得并行执行更简单,更具声明性。

      您不再需要在flatMap 中创建Observable/Flowable,您只需在Flowable 上调用parallel(),它就会返回ParallelFlowable

      它不像普通的 Flowable 那样功能丰富,因为并发性会引发 Rx 合约的许多问题,但你有基本的 map()filter() 等等,在大多数情况下应该足够了。

      所以不是来自@LordRaydenMK 的这个流程,而是回答:

      Observable<Integer> vals = Observable.range(1,10);
      
      vals.flatMap(val -> Observable.just(val)
              .subscribeOn(Schedulers.computation())
              .map(i -> intenseCalculation(i))
          ).subscribe(val -> System.out.println(val));
      

      现在你可以这样做了:

      Flowable<Integer> vals = Flowable.range(1, 10);
      
      vals.parallel()
              .runOn(Schedulers.computation())
              .map(i -> intenseCalculation(i))
              .sequential()
              .subscribe(val -> System.out.println(val));
      

      【讨论】:

      • AFAIK,flowable 用于具有背压的流。如果我们不需要背压,那么我们应该使用 Observable。是否有像 flowable 一样的 Observable 的并行实现?
      • 只使用 backpressure.NONE 和 observable 一样.. 虽然 UNBOUNDED_IN 会更好
      【解决方案3】:

      为此,您必须指定 subscribeOn(Schedulers.computation()) 而不是 observeOn(Schedulers.computation())。 在subscribeOn 中,您声明要在哪个线程中发出值。 在observeOn 中,您声明要在哪个线程中处理并观察它们。

      【讨论】:

        【解决方案4】:

        使用flatMap并指定订阅Schedulers.computation()将实现并发。

        这是一个使用Callable的更实际的例子,从输出中我们可以看到完成所有任务大约需要2000毫秒。

        static class MyCallable implements Callable<Integer> {
        
            private static final Object CALLABLE_COUNT_LOCK = new Object();
            private static int callableCount;
        
            @Override
            public Integer call() throws Exception {
                Thread.sleep(2000);
                synchronized (CALLABLE_COUNT_LOCK) {
                    return callableCount++;
                }
            }
        
            public static int getCallableCount() {
                synchronized (CALLABLE_COUNT_LOCK) {
                    return callableCount;
                }
            }
        }
        
        private static void runMyCallableConcurrentlyWithRxJava() {
            long startTimeMillis = System.currentTimeMillis();
        
            final Semaphore semaphore = new Semaphore(1);
            try {
                semaphore.acquire();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Observable.just(new MyCallable(), new MyCallable(), new MyCallable(), new MyCallable())
                    .flatMap(new Function<MyCallable, ObservableSource<?>>() {
                        @Override
                        public ObservableSource<?> apply(@NonNull MyCallable myCallable) throws Exception {
                            return Observable.fromCallable(myCallable).subscribeOn(Schedulers.computation());
                        }
                    })
                    .subscribeOn(Schedulers.computation())
                    .subscribe(new Observer<Object>() {
                        @Override
                        public void onSubscribe(@NonNull Disposable d) {
        
                        }
        
                        @Override
                        public void onNext(@NonNull Object o) {
                            System.out.println("onNext " + o);
                        }
        
                        @Override
                        public void onError(@NonNull Throwable e) {
        
                        }
        
                        @Override
                        public void onComplete() {
                            if (MyCallable.getCallableCount() >= 4) {
                                semaphore.release();
                            }
                        }
                    });
        
        
            try {
                semaphore.acquire();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                semaphore.release();
            }
            System.out.println("durationMillis " + (System.currentTimeMillis()-startTimeMillis));
        }
        

        【讨论】:

          【解决方案5】:

          这仍然以相同的顺序出现。即使在新线程上

              Observable<Integer> ob3 = Observable.range(1, 5);
          
              ob3.flatMap(new Func1<Integer, Observable<Integer>>() {
          
                  @Override
                  public Observable<Integer> call(Integer pArg0) {
          
                      return Observable.just(pArg0);
                  }
          
              }).subscribeOn(Schedulers.newThread()).map(new Func1<Integer, Integer>() {
          
                  @Override
                  public Integer call(Integer pArg0) {
          
                      try {
                          Thread.sleep(1000 - (pArg0 * 100));
                          System.out.println(pArg0 + "  ccc   " + Thread.currentThread().getName());
                      } catch (Exception e) {
                          e.printStackTrace();
                      }
          
                      return pArg0;
                  }
          
              }).subscribe();
          

          输出

          1 ccc RxNewThreadScheduler-1

          2 ccc RxNewThreadScheduler-1

          3 ccc RxNewThreadScheduler-1

          4 ccc RxNewThreadScheduler-1

          5 ccc RxNewThreadScheduler-1

          【讨论】:

            猜你喜欢
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 2023-03-11
            • 2018-06-11
            • 2021-05-02
            • 1970-01-01
            • 2012-01-04
            相关资源
            最近更新 更多