【问题标题】:How to wait until JavaRx2 Flowable will finish all its tasks?如何等到 JavaRx2 Flowable 完成所有任务?
【发布时间】:2020-01-17 17:40:53
【问题描述】:

我正在尝试学习 RxJava2 库的基础知识,现在我被困在以下时刻:
我已经通过Flowable.generate(...) 生成了myFlowable,现在我需要等待所有任务完成执行,然后才能继续进行。
这是展示问题的代码:

myFlowable.parallel()
            .runOn(Schedulers.computation())
            .map(val -> myCollection.add(val))
            .sequential()
            .subscribe(val -> {
                System.out.println("Thread from subscribe: " + Thread.currentThread().getName());
                System.out.println("Value from subscribe: " + val.toString());
            });
    System.out.println("Before sleep - Number of objects: " + myCollection.size());
    try {
        Thread.sleep(1000);
        System.out.println("After sleep - Number of objects: " + myCollection.size());
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

我完成所有任务并将结果添加到集合中。如果我在 myFlowable 块之后检查集合大小,那么它会有所不同,如果我在小的 Thread.sleep() 之后检查它。有什么方法可以检查所有任务是否已完成执行,我们可以继续进行吗?任何帮助或指导将不胜感激。

【问题讨论】:

    标签: java multithreading asynchronous rx-java2


    【解决方案1】:

    由于 RxJava 是异步的,observable 下面的 java 代码将运行,而 observable 将在不同的线程中运行,这就是为什么如果您想在 Flowable 完成发送数据时收到通知,您应该在 RxJava 流中执行此操作。为此,您有一个操作员 .doOnComplete 这里有一个示例如何检测流何时完成

            Flowable.range(0, 100).parallel()
                .runOn(Schedulers.computation())
                .map(integer -> {
    
                    return integer;
                })
                .sequential()
                .doOnComplete(() -> {
                    System.out.println("finished");
                })
                .subscribe(integer -> System.out.println(integer));
    

    【讨论】:

      【解决方案2】:

      您可以使用 AtomicBoolean,将其初始化为 false,然后使用 doFinally() 将其设置为 true。

      doFinally() 在 Observable 发出 onError 或 onCompleted 信号之后调用,或者它被下游处理掉。

      然后让主线程休眠,直到completed 的值为真。

      使用您的示例:

      AtomicBoolean completed = new AtomicBoolean(false);
      
      myFlowable.parallel()
                  .runOn(Schedulers.computation())
                  .map(val -> myCollection.add(val))
                  .sequential()
                  .doFinally(() -> completed.set(true))
                  .subscribe(val -> {
                      ...
                  });
          ...
      try {
         while(!completed.get()){
             Thread.sleep(1000);
             ...
         }
        ...
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      

      【讨论】:

        【解决方案3】:

        使用Flowable::blockingSubscribe() - 将当前Flowable运行到一个终端事件,忽略任何值并重新抛出任何异常。

        http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Flowable.html#blockingSubscribe--

        【讨论】:

          猜你喜欢
          • 2011-03-17
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2017-04-14
          • 1970-01-01
          • 2020-05-13
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多