【发布时间】:2021-07-20 15:28:49
【问题描述】:
我最近探索了项目反应器库,并尝试将它用于我的用例,其中我有一个任务列表,有些任务依赖于其他任务的执行,有些任务可以并行执行以提高性能。执行顺序以有向无环图的形式出现。下面是 POC 代码:
public class ReactorPOC {
public static void main(String args[]) {
//First time executing mono is taking long time
run();
//All subsequent executions not excess time
run();
run();
}
public static void run() {
try {
ExecutorService executorService = Executors.newFixedThreadPool(5);
Long st = System.currentTimeMillis();
Publisher one = getTask(60, "one", executorService, st).cache();
Publisher two = getTask(60, "two", executorService, st).cache();
Publisher three = getTask(60, "three", executorService, st).cache();
Publisher four = getTask(60, "four", executorService, st).cache();
Publisher eight = getTask(60, "eight", executorService, st).cache();
Publisher five = getTask(60, "five", executorService, st).cache();
Publisher six = getTask(60, "six", executorService, st).cache();
Publisher seven = getTask(60, "seven", executorService, st).cache();
three = Flux.concat(Flux.merge(one, two), three);
five = Flux.concat(Flux.merge(three, four, eight), five);
six = Flux.concat(five, six);
seven = Flux.concat(five, seven);
Flux last = Flux.merge(one, two, three, four, five, six, seven, eight);
last.blockLast();
System.out.println(System.currentTimeMillis() - st);
} catch (Exception e) {
System.out.println(e);
}
}
static Mono getTask(int sleep, String task, ExecutorService executorService, long st) {
return Mono.just(task).doOnSubscribe( i -> {
System.out.println("Starting " + task + " at " + (System.currentTimeMillis() - st));
try {
Thread.sleep(sleep);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Ending " + task + " at " + (System.currentTimeMillis() - st));
}).subscribeOn(Schedulers.fromExecutor(executorService));
}
}
这在执行顺序方面按预期工作。但我有两个疑问:
-
我已经执行了 3 次图形执行(从主函数调用 3 次)。第一次,它需要大约 1200 毫秒,这太长了,对于所有下一次执行,它需要大约 250 毫秒,这是它应该花费的预期时间。我试图理解为什么第一次需要这么长时间。
-
如果有任何任务中断,我希望有办法抛出异常,而不是按执行顺序继续执行。有没有办法做到这一点?我有一种方法,我想到了我将在哪里保留共享对象并设置一个有错误的字段,所有下一个任务将首先查看该字段,然后决定不执行该任务。我想检查是否有更好的方法。
请帮助澄清上述两个问题。
另外,我对这个库和整个反应范式都是新手。因此,如果对上述代码有任何意见/建议,那就太好了。
谢谢:)
【问题讨论】:
-
我运行了您的代码,但在执行任何任务时没有发现任何特别明显的膨胀。
-
我刚刚再次运行它,在 run() 中的 try 块末尾打印的输出分别为 run() 上的 3 次调用提供了 1342、254、244 毫秒。第一次执行花费了 1342 毫秒,与接下来的 2 次执行大约 250 毫秒相比,这很奇怪。运行上面的代码没花那么长时间吗?
-
对我来说,整个事情在 try 块的末尾需要 437、248 和 257。
-
哦,这很奇怪。但无论如何,即使对你来说,如果你多次运行它,与下一次执行相比,第一次会花费更多时间,但对我来说并没有那么高。因此,寻找相同的原因,因为我将使用它的应用程序对延迟敏感(只是试图确保即使是第一次执行也很快:))
-
您不需要在这种安排中使用
.cache()。如果我没记错的话,如果你将 all Monos 分配给一个不同的变量名,你就不会使用任务变量两次,这表明缓存不是必需的。
标签: java rx-java reactive-programming project-reactor reactive