【问题标题】:How to process each item differently in a Observable stream(RxJava)如何在 Observable 流中以不同方式处理每个项目(RxJava)
【发布时间】:2016-07-11 22:39:23
【问题描述】:

让我举个例子:给定从 1 到 10 的数字流,以不同的方式处理偶数和奇数。在不同的线程中处理奇数并将此转换应用于它们 (2 * i)。在主线程中处理偶数并将此转换应用于它们 (2 * i - 1)。订阅者如下:

finalObservable.subscribe(new Action1<Integer>() {
   @Override
   public void call(Integer t) {
      System.out.println(Thread.currentThread() + " " + t);
   }});

输出应该是

Thread-1 2
main 3
Thread-1 6
main 7
Thread-1 10
main 11
Thread-1 14
main 15
Thread-1 18
main 19

如何使用 RxJava-Observables 操作符做到这一点?

【问题讨论】:

    标签: java functional-programming rx-java


    【解决方案1】:

    处理这个问题的最优雅的方法是使用Observableshare() 运算符。在 huuuuge 的简化中,它允许您将 observable 拆分为多个。因此,在您的情况下,表示数字流的可观察对象可以分为两个可观察对象。一个用于奇数,一个用于偶数。

    假设 allNumbers(在您的示例中为 finalObservable)代表 数字流:

    final Observable<Integer> allNumbers =
            Observable.from(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
                    .share();
    Observable<Integer> oddNumbers = allNumbers.filter(new Func1<Integer, Boolean>() {
        @Override
        public Boolean call(Integer integer) {
            return integer % 2 != 0;
        }
    });
    Observable<Integer> evenNumbers = allNumbers.filter(new Func1<Integer, Boolean>() {
        @Override
        public Boolean call(Integer integer) {
            return integer % 2 == 0;
        }
    });
    
    final Action1<Integer> printingAction = new Action1<Integer>() {
        @Override
        public void call(Integer t) {
            System.out.println(Thread.currentThread() + " " + t);
        }
    };
    
    evenNumbers.subscribeOn(Schedulers.computation()).subscribe(printingAction);
    oddNumbers.subscribeOn(AndroidSchedulers.mainThread()).subscribe(printingAction);
    

    并使用 Retrolambda 进行简化:

    final Observable<Integer> allNumbers =
            Observable.from(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
                    .share();
    Observable<Integer> oddNumbers = allNumbers.filter(integer -> integer % 2 != 0);
    Observable<Integer> evenNumbers = allNumbers.filter(integer -> integer % 2 == 0);
    
    final Action1<Integer> printingAction = 
             t -> System.out.println(Thread.currentThread() + " " + t);
    
    evenNumbers.subscribeOn(Schedulers.computation()).subscribe(printingAction);
    oddNumbers.subscribeOn(AndroidSchedulers.mainThread()).subscribe(printingAction);
    

    您没有确切指定需要在哪个线程中处理什么,因此您可能需要更正 subscribeOn 参数并可能添加 observeOn 运算符。根据您的需要。

    【讨论】:

    • 谢谢,是的,这很好用。我还有一个问题。除了订阅 2 个不同的 observable,有没有办法合并 Observable,使输出与输入的顺序相同?
    • 您可以使用Observable.merge,但整个想法是,使用两个不同的可观察对象,您可以在线程方面对它们进行不同的处理。
    【解决方案2】:

    一般的想法是这样的(使用flatMap):

    Observable.from(new Integer[]{1, 2, 3, 4, 5})
              .flatMap(number -> {
                  if (number % 2 == 0) {
                      return Observable.just(2 * number - 1);
                  } else {
                      return Observable.fromCallable(() -> 2 * number)
                                       .subscribeOn(Schedulers.io());
                  }
              })
              .subscribe(new Action1<Integer>() {
                  @Override
                  public void call(Integer integer) {
                      System.out.println(Thread.currentThread() + " " + integer);
                  }
              });
    

    【讨论】:

    • 我很难知道何时使用 flatmap 与 map,但为什么不能只映射到 return 2*number,例如?
    • @cricket_007 没错,只要我们不必为不同的值使用不同的线程,我们就能做到
    • 我认为这与它有关。我猜从订阅运行线程是不好的做法?
    • @cricket_007 你的意思是用非反应式仪器改变线程,而是用纯Java?你可以做到,但它看起来不会那么好。
    • 我只是好奇这是否可能。我仍然掌握 Rx 的窍门 :)
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多