【问题标题】:Project Reactor timeout项目反应堆超时
【发布时间】:2020-01-25 15:13:28
【问题描述】:

我正在一个项目反应堆研讨会上工作,并坚持以下任务:

/**
     * TODO 5
     * <p>
     * For each item call received in colors flux call the {@link #simulateRemoteCall} operation.
     * Timeout in case the {@link #simulateRemoteCall} does not return within 400 ms, but retry twice
     * If still no response then provide "default" as a return value
     */

我无法解决的问题是 Flux 实际上从未抛出 TimeOutException!我可以在控制台日志中观察到这一点:

16:05:09.759 [main] INFO Part04HandlingErrors - Received red delaying for 300 
16:05:09.781 [main] INFO Part04HandlingErrors - Received black delaying for 500 
16:05:09.782 [main] INFO Part04HandlingErrors - Received tan delaying for 300 

我试图调整语句的顺序,尽管它似乎并没有改变行为。注意:此外,我尝试了 timeout() 的重载变体,它接受一个默认值,如果没有发出任何元素,则应该返回该值。

public Flux<String> timeOutWithRetry(Flux<String> colors) {

        return colors
                .timeout(Duration.ofMillis(400))
                //.timeout(Duration.ofMillis(400), Mono.just("default"))
                .retry(2)
                .flatMap(this::simulateRemoteCall)
                .onErrorReturn(TimeoutException.class, "default");

    }

有人可以解释为什么没有发生超时吗?我怀疑该机制在某种程度上没有“绑定”到 flatMap 调用的方法。

为了完整性:辅助方法:

public Mono<String> simulateRemoteCall(String input) {
        int delay = input.length() * 100;
        return Mono.just(input)
                .doOnNext(s -> log.info("Received {} delaying for {} ", s, delay))
                .map(i -> "processed " + i)
                .delayElement(Duration.of(delay, ChronoUnit.MILLIS));
    }

更完整,这是我为验证功能而进行的测试:

@Test
    public void timeOutWithRetry() {
        Flux<String> colors = Flux.just("red", "black", "tan");

        Flux<String> results = workshop.timeOutWithRetry(colors);

        StepVerifier.create(results).expectNext("processed red", "default", "processed tan").verifyComplete();
    }

【问题讨论】:

    标签: java project-reactor


    【解决方案1】:

    Martin Tarjányi 的回答是正确的,但你也在你的代码中问了为什么

        return colors
                .timeout(Duration.ofMillis(400))
                //.timeout(Duration.ofMillis(400), Mono.just("default"))
                .retry(2)
                .flatMap(this::simulateRemoteCall)
                .onErrorReturn(TimeoutException.class, "default");
    

    没有超时发生。

    原因是如果colors 通量的元素可用,那么调用.timeout(Duration.ofMillis(400)) 没有任何效果,因为timeout 只会传播TimeoutException,如果no 项目在其中发出给定的持续时间为 400 毫秒,但在本例中并非如此。

    因此元素被发射,retry(2) 也没有效果。接下来,您在发出的元素上调用simulateRemoteCall,这需要一些时间,但不会返回错误。您的代码的结果(除了时间差异之外)与您只是在给定的通量上应用地图相同:

    public Flux<String> timeOutWithRetry(Flux<String> colors) {
        return colors.map(s -> "processed " + s);
    }
    

    如果您想看到调用 simulateRemoteCall 的超时,那么您必须在此调用之后添加 timeout 方法。

    您也可以使用concatMap,而不是使用flatMap。区别在于是否应保留顺序,即default 值是否可能出现乱序。

    使用concatMap,答案如下:

    public Flux<String> timeOutWithRetry(Flux<String> colors) {
        return colors.concatMap(
                color -> simulateRemoteCall(color)
                            .timeout(Duration.ofMillis(400))
                            .retry(2)
                            .onErrorReturn("default"));
    }
    

    【讨论】:

      【解决方案2】:

      你是对的,是语句的顺序和位置不正确。 既然你想重试/超时/错误处理远程调用,你应该把这些操作符放在远程调用的Mono而不是Flux

      Flux 上的超时观察后续元素之间经过的时间。但是,当您使用 flatMap 时,您会获得开箱即用的并发性,并且元素之间的延迟实际上为零(假设 colors Flux 来自内存列表)。所以这个操作符不应该直接放在Flux上来达到你的目的。

      重试Flux 意味着它会在出现错误时重新订阅源,这取决于源可能会导致重新处理已处理的元素。相反,您只想重试失败的元素,所以它也应该放在Mono

      public Flux<String> timeOutWithRetry(Flux<String> colors) {
      
          return colors.flatMap(color -> simulateRemoteCall(color).timeout(Duration.ofMillis(400))
                                                                  .retry(2)
                                                                  .onErrorReturn("default"));
      }
      

      【讨论】:

      • 感谢您的回答!我现在确实看到了将 retry() 和 timeout() 运算符添加到 Flux 与由异步函数而不是 Flux 产生的 Mono 的区别。尽管如此,我仍然在日志中看到相同的输出。难道是辅助方法实际上是罪魁祸首?
      • 您的预期输出是什么?
      • 它应该打印“处理过的红色”、“默认”、“处理过的棕褐色”。我在我的帖子中添加了来自研讨会的单元测试。所以超时应该命中第二个字符串并返回“default”而不是“processed black”
      猜你喜欢
      • 2019-10-23
      • 2018-09-01
      • 2021-06-27
      • 2020-11-11
      • 1970-01-01
      • 2013-08-18
      • 1970-01-01
      • 2022-06-11
      • 2016-07-07
      相关资源
      最近更新 更多