正如 Reactor 文档中提到的各种 subscribe 方法:
请记住,由于序列可以是异步的,这将
立即将控制权返回给调用线程。这可以给
在主线程中执行时未调用消费者的印象
或例如单元测试。
这意味着 main 方法已经结束,因此主线程在任何线程能够订阅 Reactive 链之前退出,正如 Piotr 所提到的。
您要做的是等到整个链完成后再打印数组的内容。
这样做的幼稚方法是:
ArrayList<Integer> arrList = new ArrayList<>();
System.out.println("Before: " + arrList);
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribeOn(Schedulers.parallel())
.doOnNext(arrList::add)
.blockLast();
System.out.println("After: " + arrList);
在这里,您在主线程上阻塞执行,直到处理完 Flux 上的最后一个元素。因此,在您的 ArrayList 完全填充之前,最后一个 System.out 不会执行。
请记住,代码在控制台应用程序中的运行方式与在 Netty 等服务器环境中的运行方式略有不同。使控制台应用程序等待所有订阅启动的唯一方法是block。
但是并行线程上不允许阻塞。所以这种方法在 Netty 环境中是行不通的。在那里,您的服务器将一直运行,直到明确关闭,因此 subscribe 就可以了。
但是,在上面的代码 sn-p 中,您阻塞不仅是为了阻止应用程序退出,而且是在您读取已填充的数据之前等待。
对上述代码的改进如下:
ArrayList<Integer> arrList = new ArrayList<>();
System.out.println("Before: " + arrList);
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribeOn(Schedulers.parallel())
.doOnNext(arrList::add)
.doOnComplete(() -> System.out.println("After: " + arrList))
.blockLast();
即使在这里,doOnComplete 也可以从反应链外部访问数据。为了防止这种情况,您将在链本身中收集 Flux 的元素,如下所示:
System.out.println("Before.");
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribeOn(Schedulers.parallel())
.collectList()
.doOnSuccess(list -> System.out.println("After: " + list))
.block();
再次记住,在 Netty 中运行时(例如 Spring Webflux 应用程序),上述代码将以 subscribe() 结尾。
不过,请注意,从 Flux 切换到 List(或任何 Collection)意味着您正在从响应式范式切换到命令式编程。您应该能够在响应式范式本身内实现任何功能。