【问题标题】:Flux Backlog tasks not executed correctly in the event of an exception inspite of using FlatMapDelayError尽管使用了 FlatMapDelayError,但发生异常时 Flux Backlog 任务未正确执行
【发布时间】:2021-09-14 12:01:50
【问题描述】:

我面临一个问题,我有一个数据通量,其中一些数据可能会产生错误。我需要处理正确的数据,最后为不正确的数据产生错误。我有以下示例代码接近我正在做的事情。期望代码将奇数和偶数整数分开到自己的组中,并进一步将它们减少到最终值。但是当它遇到“零”时会抛出异常。所以最终期望是“零”的例外,奇数和偶数的简化整数。

public class Example1FlatMapDelayError1 {
    public static void main(String[] args) {
        ReactorDebugAgent.init();
        ReactorDebugAgent.processExistingClasses();

    Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 0)
        .flatMapDelayError(
            integer -> {
              if (integer == 0) {
                return Mono.error(() -> new IllegalStateException());
              }
              return Mono.just(integer);
            },
            4,
            3)
        .groupBy(integer -> integer % 2 == 0 ? "Even" : "Odd")
        .flatMapDelayError(
            group -> {
              System.out.println("Grouping Key-->" + group.key());

              return group
                  .collectList()
                  .doOnNext(integers -> {
                      System.out.println("Key Vlaue -->" + integers.stream().map(integer -> String.valueOf(integer)).collect(Collectors.joining(",")));
                  })
                  .flatMapMany(
                      integers -> {
                        return Flux.fromIterable(integers).reduce((i1, i2) -> i1 * i2);
                      });
            },
            4,
            3)
        .subscribe(
            out -> System.out.println("Final -> " + out), throwable -> throwable.printStackTrace());

       
    }
}


相反,我看到的行为是整个流程因嵌套异常而中断,并且没有完成任何计算。附加跟踪以供参考。感谢帮助理解我哪里出错了。

JDK :11 反应堆:3.4.9

SLF4J:类路径包含多个 SLF4J 绑定。 ... SLF4J:实际 绑定是类型 [com.apple.jvm.commons.logging.structured.slf4j.MapHidingLog4jLoggerFactory] 分组键-->奇数分组键-->偶数 reactor.core.Exceptions$CompositeException:在多个异常 reactor.core.Exceptions.multiple(Exceptions.java:120) 在 reactor.core.Exceptions.addThrowable(Exceptions.java:92) 在 reactor.core.publisher.FluxFlatMap$FlatMapMain.innerError(FluxFlatMap.java:858) 在 reactor.core.publisher.FluxFlatMap$FlatMapInner.onError(FluxFlatMap.java:990) 在 reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:393) 在 reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:821) 在 reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:608) 在 reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588) 在 reactor.core.publisher.FluxFlatMap$FlatMapMain.onError(FluxFlatMap.java:451) 在 reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:393) 在 reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:393) 在 reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onError(FluxPeekFuseable.java:234) 在 reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:393) 在 reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onError(MonoCollectList.java:113) 在 reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.checkTerminated(FluxGroupBy.java:667) 在 reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:559) 在 reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:652) 在 reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.onError(FluxGroupBy.java:702) 在 reactor.core.publisher.FluxGroupBy$GroupByMain.signalAsyncError(FluxGroupBy.java:266) 在 reactor.core.publisher.FluxGroupBy$GroupByMain.checkTerminated(FluxGroupBy.java:427) 在 reactor.core.publisher.FluxGroupBy$GroupByMain.drainLoop(FluxGroupBy.java:385) 在 reactor.core.publisher.FluxGroupBy$GroupByMain.drain(FluxGroupBy.java:329) 在 reactor.core.publisher.FluxGroupBy$GroupByMain.onError(FluxGroupBy.java:219) 在 reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:821) 在 reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:608) 在 reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588) 在 reactor.core.publisher.FluxFlatMap$FlatMapMain.onComplete(FluxFlatMap.java:465) 在 reactor.core.publisher.FluxArray$ArraySubscription.slowPath(FluxArray.java:138) 在 reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:100) 在 reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:371) 在 reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53) 在 reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59) 在 reactor.core.publisher.Flux.subscribe(Flux.java:8411) 在 reactor.core.publisher.Flux.subscribeWith(Flux.java:8584) 在 reactor.core.publisher.Flux.subscribe(Flux.java:8381) 在 reactor.core.publisher.Flux.subscribe(Flux.java:8305) 在 reactor.core.publisher.Flux.subscribe(Flux.java:8275) 在 com.apple.ist.empsys.amalgam.config.services.Example1FlatMapDelayError1.main(未知 来源)抑制:java.lang.IllegalStateException at com.apple.ist.empsys.amalgam.config.services.Example1FlatMapDelayError1.lambda$main$0(Example1FlatMapDelayError1.java:22) 抑制: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:装配 来自生产者 [reactor.core.publisher.MonoCollectList] 的跟踪: reactor.core.publisher.GroupedFlux.collectList com.apple.ist.empsys.amalgam.config.services.Example1FlatMapDelayError1.lambda$main$7(Example1FlatMapDelayError1.java:35) 在以下站点发现错误:|_ GroupedFlux.collectList ⇢ 在 com.apple.ist.empsys.amalgam.config.services.Example1FlatMapDelayError1.lambda$main$7(Example1FlatMapDelayError1.java:35) |_ Mono.doOnNext ⇢ 在 com.apple.ist.empsys.amalgam.config.services.Example1FlatMapDelayError1.lambda$main$7(Example1FlatMapDelayError1.java:36) |_ Mono.flux ⇢ 在 com.apple.ist.empsys.amalgam.config.services.Example1FlatMapDelayError1.lambda$main$7(Example1FlatMapDelayError1.java:39) |_ Flux.flatMapDelayError ⇢ at com.apple.ist.empsys.amalgam.config.services.Example1FlatMapDelayError1.lambda$main$7(Example1FlatMapDelayError1.java:40) |_ GroupedFlux.collectList ⇢ at com.apple.ist.empsys.amalgam.config.services.Example1FlatMapDelayError1.lambda$main$7(Example1FlatMapDelayError1.java:35) |_ Mono.doOnNext ⇢ 在 com.apple.ist.empsys.amalgam.config.services.Example1FlatMapDelayError1.lambda$main$7(Example1FlatMapDelayError1.java:36) |_ Mono.flux ⇢ 在 com.apple.ist.empsys.amalgam.config.services.Example1FlatMapDelayError1.lambda$main$7(Example1FlatMapDelayError1.java:39) |_ Flux.flatMapDelayError ⇢ at com.apple.ist.empsys.amalgam.config.services.Example1FlatMapDelayError1.lambda$main$7(Example1FlatMapDelayError1.java:40) 堆栈跟踪:在 com.apple.ist.empsys.amalgam.config.services.Example1FlatMapDelayError1.lambda$main$0(Example1FlatMapDelayError1.java:22) 在 reactor.core.publisher.MonoErrorSupplied.call(MonoErrorSupplied.java:61) 在 reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:405) 在 reactor.core.publisher.FluxArray$ArraySubscription.slowPath(FluxArray.java:127) 在 reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:100) 在 reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:371) 在 reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53) 在 reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59) 在 reactor.core.publisher.Flux.subscribe(Flux.java:8411) 在 reactor.core.publisher.Flux.subscribeWith(Flux.java:8584) 在 reactor.core.publisher.Flux.subscribe(Flux.java:8381) 在 reactor.core.publisher.Flux.subscribe(Flux.java:8305) 在 reactor.core.publisher.Flux.subscribe(Flux.java:8275) 在 com.apple.ist.empsys.amalgam.config.services.Example1FlatMapDelayError1.main(未知 资料来源)被禁止:[循环参考: java.lang.IllegalStateException] 被禁止:[循环引用: java.lang.IllegalStateException] 与目标 VM 断开连接, 地址:'127.0.0.1:51263',传输:'socket'

进程以退出代码 0 结束

【问题讨论】:

  • 文档说如果映射器抛出异常,错误将被替换。您可以尝试直接从映射函数中抛出异常吗? (我暂时无法自己测试)
  • @amanin 抱歉回复延迟。我尝试通过抛出异常替换“Mono.error()”的建议,但结果相同

标签: java spring-webflux project-reactor


【解决方案1】:

由于您的减少,您有一个重要的参考。您需要重组代码以消除对异常的重新处理。

public class Example1FlatMapDelayError1 {

  public static void main(String[] args) {

    Flux<GroupedFlux<Object, Integer>> flux1 = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 0)
      .flatMapDelayError(integer -> {
        if (integer == 5) {
          return Mono.error(() -> new IllegalStateException("5"));
        }
        return Mono.just(integer);
      }, 4, 3)
      .groupBy(integer -> integer % 2 == 0 ? "Even" : "Odd");

    Flux<Integer> flux2 = flux1.flatMapDelayError(group ->
      group.reduce(0, (i1, i2) -> i1 * i2), 4, 5); // Reduce processes exceptions multiple times...

    flux2.subscribe(x -> System.out.println("Final -> " + x), err -> err.printStackTrace());
  }
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-05-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-07-14
    相关资源
    最近更新 更多