【问题标题】:subscribeOn(Schedulers.parallel()) is not workingsubscribeOn(Schedulers.parallel()) 不工作
【发布时间】:2019-01-16 08:12:04
【问题描述】:

我正在学习反应堆核心并关注此https://www.baeldung.com/reactor-core

ArrayList<Integer> arrList = new ArrayList<Integer>();
System.out.println("Before: " + arrList);
Flux.just(1, 2, 3, 4)
  .log()
  .map(i -> i * 2)
  .subscribeOn(Schedulers.parallel())
  .subscribe(arrList::add);

System.out.println("After: " + arrList);

当我执行上面的代码行时,给出了。

 Before: []
 [DEBUG] (main) Using Console logging
 After: []

以上代码行应该在另一个线程中开始执行,但它根本不工作。 有人可以帮我吗?

【问题讨论】:

    标签: reactive-programming spring-webflux reactive-streams


    【解决方案1】:

    正如 Reactor 文档中提到的各种 subscribe 方法:

    请记住,由于序列可以是异步的,这将 立即将控制权返回给调用线程。这可以给 在主线程中执行时未调用消费者的印象 或例如单元测试。

    这意味着 main 方法已经结束,因此主线程在任何线程能够订阅 Reactive 链之前退出,正如 Piotr 所提到的。

    您要做的是等到整个链完成后再打印数组的内容。

    这样做的幼稚方法是:

        ArrayList<Integer> arrList = new ArrayList<>();
        System.out.println("Before: " + arrList);
        Flux.just(1, 2, 3, 4)
                .log()
                .map(i -> i * 2)
                .subscribeOn(Schedulers.parallel())
                .doOnNext(arrList::add)
                .blockLast();
    
        System.out.println("After: " + arrList);
    

    在这里,您在主线程上阻塞执行,直到处理完 Flux 上的最后一个元素。因此,在您的 ArrayList 完全填充之前,最后一个 System.out 不会执行。

    请记住,代码在控制台应用程序中的运行方式与在 Netty 等服务器环境中的运行方式略有不同。使控制台应用程序等待所有订阅启动的唯一方法是block

    但是并行线程上不允许阻塞。所以这种方法在 Netty 环境中是行不通的。在那里,您的服务器将一直运行,直到明确关闭,因此 subscribe 就可以了。

    但是,在上面的代码 sn-p 中,您阻塞不仅是为了阻止应用程序退出,而且是在您读取已填充的数据之前等待。

    对上述代码的改进如下:

        ArrayList<Integer> arrList = new ArrayList<>();
        System.out.println("Before: " + arrList);
        Flux.just(1, 2, 3, 4)
                .log()
                .map(i -> i * 2)
                .subscribeOn(Schedulers.parallel())
                .doOnNext(arrList::add)
                .doOnComplete(() -> System.out.println("After: " + arrList))
        .blockLast();
    

    即使在这里,doOnComplete 也可以从反应链外部访问数据。为了防止这种情况,您将在链本身中收集 Flux 的元素,如下所示:

        System.out.println("Before.");
        Flux.just(1, 2, 3, 4)
                .log()
                .map(i -> i * 2)
                .subscribeOn(Schedulers.parallel())
                .collectList()
                .doOnSuccess(list -> System.out.println("After: " + list))
        .block();
    

    再次记住,在 Netty 中运行时(例如 Spring Webflux 应用程序),上述代码将以 subscribe() 结尾。

    不过,请注意,从 Flux 切换到 List(或任何 Collection)意味着您正在从响应式范式切换到命令式编程。您应该能够在响应式范式本身内实现任何功能。

    【讨论】:

      【解决方案2】:

      我认为有些混乱。当您致电subscribeOn(Schedulers.parallel()) 时。您指定要在不同线程上接收项目。此外,您必须放慢代码速度,以便订阅 cen 真正启动(这就是我添加 Thread.sleep(100) 的原因)。如果您运行我通过的代码,它就可以工作。你看reactor中没有神奇的同步机制。

          ArrayList<Integer> arrList = new ArrayList<Integer>();
      
          Flux.just(1, 2, 3, 4)
                  .log()
                  .map(i -> i * 2)
                  .subscribeOn(Schedulers.parallel())
                  .subscribe(
                          t -> {
                              System.out.println(t + " thread id: " + Thread.currentThread().getId());
                              arrList.add(t);
                          }
                  );
          System.out.println("size of arrList(before the wait): " + arrList.size());
          System.out.println("Thread id: "+ Thread.currentThread().getId() + ": id of main thread ");
          Thread.sleep(100);
          System.out.println("size of arrList(after the wait): " + arrList.size());
      

      如果您想将您的项目添加到并行反应器列表中不是一个好的选择。最好在 java 8 中使用并行流。

      List<Integer> collect = Stream.of(1, 2, 3, 4)
                      .parallel()
                      .map(i -> i * 2)
                      .collect(Collectors.toList());
      

      您发布的那个教程在并发部分方面不是很精确。对于作者的信用,他/她说会有更多的文章来。但我认为根本不应该发布那个特定的例子,因为它会造成混乱。我建议不要太相信互联网上的资源:)

      【讨论】:

      • 非常感谢您宝贵的 cmets,现在我知道幕后发生了什么。
      猜你喜欢
      • 2015-08-22
      • 1970-01-01
      • 2019-04-27
      • 1970-01-01
      • 1970-01-01
      • 2020-08-06
      • 1970-01-01
      • 2017-01-11
      • 1970-01-01
      相关资源
      最近更新 更多