【问题标题】:RxJava asynchronous alternative to doOnNextRxJava 异步替代 doOnNext
【发布时间】:2018-07-03 13:29:04
【问题描述】:

我想使用 RxJava 进行长时间操作,同时不阻止在 subscribe 方法(或其他方式)上使用它们。 longOperation() 只是在数据库中保存价值,因此它不会影响价值本身。我有以下代码:

Observable.interval(1, TimeUnit.SECONDS)
          .doOnNext { 
              value -> longOperation(value)
           }
          .subscribe {
              finalConsumer(it)  
           }

如何让它异步?

谢谢。

【问题讨论】:

    标签: rx-java


    【解决方案1】:

    使用flatMapsubscribeOn

    Observable.interval(1, TimeUnit.SECONDS)
      .flatMap(value ->
          Completable.fromAction(() ->
            longOperation(value))
          .toObservable()
          .subscribeOn(Scheduler.io()))
      .subscribe {
         subscriber  
      }
    

    您可能还希望通过将Schedulers.io() 替换为Schedulers.from(Executors.newFixedThreadPool(n)) 来限制调度程序。

    【讨论】:

    • finalConsumer(it) 永远不会被调用,因为你将原始值 flatMap 到一个从不发出任何东西的 Observable 中。此外,这个解决方案会让我们在 finalConsumer 执行之前等待 longOperation 完成,这听起来像是 OP 试图避免的。
    • 是的,我并没有太在意 finalConsumer 的东西,但是如果用户想要对保存到数据库的值做更多的事情,那么同样的技术也可以用于 Single 而不是 Completable。如果 OP 想要更多,那么 OP 可以发表评论。
    【解决方案2】:

    为什么不使用subscribeOn?如果你使用subscribeOn(Schedulers.io())而不使用observeOn以上doOnNext你的代码将在IO线程中执行。

    【讨论】:

      【解决方案3】:

      这应该可以工作

      Observable.interval(1, TimeUnit.SECONDS)
              .flatMap(value -> Observable.just(value)
                      .mergeWith(Completable
                              .fromAction(() -> longOperation(value))
                              .subscribeOn(Schedulers.io())))
              .subscribe(finalConsumer(it));
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2016-08-14
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多