【问题标题】:Project reactor publishers initialisation taking time for first time initialisation项目反应堆发布者初始化第一次初始化需要时间
【发布时间】:2021-07-20 15:28:49
【问题描述】:

我最近探索了项目反应器库,并尝试将它用于我的用例,其中我有一个任务列表,有些任务依赖于其他任务的执行,有些任务可以并行执行以提高性能。执行顺序以有向无环图的形式出现。下面是 POC 代码:

public class ReactorPOC {

    public static void main(String args[]) {

        //First time executing mono is taking long time
        run();

        //All subsequent executions not excess time
        run();
        run();
    }

    public static void run() {
        try {
            ExecutorService executorService = Executors.newFixedThreadPool(5);
            Long st = System.currentTimeMillis();
            Publisher one = getTask(60, "one", executorService, st).cache();
            Publisher two = getTask(60, "two", executorService, st).cache();
            Publisher three = getTask(60, "three", executorService, st).cache();
            Publisher four = getTask(60, "four", executorService, st).cache();
            Publisher eight = getTask(60, "eight", executorService, st).cache();
            Publisher five = getTask(60, "five", executorService, st).cache();
            Publisher six = getTask(60, "six", executorService, st).cache();
            Publisher seven = getTask(60, "seven", executorService, st).cache();
            three = Flux.concat(Flux.merge(one, two), three);
            five = Flux.concat(Flux.merge(three, four, eight), five);
            six = Flux.concat(five, six);
            seven = Flux.concat(five, seven);
            Flux last = Flux.merge(one, two, three, four, five, six, seven, eight);
            last.blockLast();
            System.out.println(System.currentTimeMillis() - st);
        } catch (Exception e) {
            System.out.println(e);
        }

    }

    static Mono getTask(int sleep, String task, ExecutorService executorService, long st) {
        return Mono.just(task).doOnSubscribe( i -> {
            System.out.println("Starting " + task + " at " + (System.currentTimeMillis() - st));
            try {
                Thread.sleep(sleep);
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("Ending " + task + " at " + (System.currentTimeMillis() - st));
        }).subscribeOn(Schedulers.fromExecutor(executorService));
    }

}

这在执行顺序方面按预期工作。但我有两个疑问:

  1. 我已经执行了 3 次图形执行(从主函数调用 3 次)。第一次,它需要大约 1200 毫秒,这太长了,对于所有下一次执行,它需要大约 250 毫秒,这是它应该花费的预期时间。我试图理解为什么第一次需要这么长时间。

  2. 如果有任何任务中断,我希望有办法抛出异常,而不是按执行顺序继续执行。有没有办法做到这一点?我有一种方法,我想到了我将在哪里保留共享对象并设置一个有错误的字段,所有下一个任务将首先查看该字段,然后决定不执行该任务。我想检查是否有更好的方法。

请帮助澄清上述两个问题。

另外,我对这个库和整个反应范式都是新手。因此,如果对上述代码有任何意见/建议,那就太好了。

谢谢:)

【问题讨论】:

  • 我运行了您的代码,但在执行任何任务时没有发现任何特别明显的膨胀。
  • 我刚刚再次运行它,在 run() 中的 try 块末尾打印的输出分别为 run() 上的 3 次调用提供了 1342、254、244 毫秒。第一次执行花费了 1342 毫秒,与接下来的 2 次执行大约 250 毫秒相比,这很奇怪。运行上面的代码没花那么长时间吗?
  • 对我来说,整个事情在 try 块的末尾需要 437、248 和 257。
  • 哦,这很奇怪。但无论如何,即使对你来说,如果你多次运行它,与下一次执行相比,第一次会花费更多时间,但对我来说并没有那么高。因此,寻找相同的原因,因为我将使用它的应用程序对延迟敏感(只是试图确保即使是第一次执行也很快:))
  • 您不需要在这种安排中使用.cache()。如果我没记错的话,如果你将 all Monos 分配给一个不同的变量名,你就不会使用任务变量两次,这表明缓存不是必需的。

标签: java rx-java reactive-programming project-reactor reactive


【解决方案1】:

就我在运行 AsyncProfiler 并查看火焰图时所见,它似乎纯粹是类加载的一个因素。

代码可以改进,特别是删除缓存并避免在doOnSubscribe 内阻塞(这是一种代码味道),但这并没有太大改变交易。

为了更好地演示,我将run() 方法复制粘贴为run2() 并让main 执行run1() 然后run2()(它们是完全相同的代码)。然后我们可以观察到run1() 总是需要更多的时间,区别在于类加载:

如果我们首先启动run2(),它会成为火焰图中最突出的一个,并带有类加载延迟:

【讨论】:

  • 为什么 doOnSubscribe 被认为是代码异味,您建议使用什么替代方案?另外,您对第 2 点有任何指示吗?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-02-08
  • 1970-01-01
  • 2020-09-16
  • 1970-01-01
相关资源
最近更新 更多