【问题标题】:Flux.onErrorContinue() null objectFlux.onErrorContinue() 空对象
【发布时间】:2023-03-10 08:56:01
【问题描述】:

我对 onErrorContinue() 有一个问题,即传递给 Biconsumer 的对象为空。

我正在使用 Spring boot 2.1.13.RELEASE 和响应式 mongo,反应堆核心版本为 3.2.15.RELEASE。

当调用数据库以检索具有 id 的记录并使用运算符 switchIfEmtpy() 我使用 Mono.error() 抛出异常并且下游我尝试使用 onErrorContinue( )。

下面的代码解释了这个问题:

public static void main(String[] args) {
    Flux.range(1, 10)
            .flatMap(integer -> mapInteger(integer))
            .doOnNext(System.out::println)
            .onErrorContinue((throwable, o) -> System.out.println("error with " + o)) // o is null
            .subscribe();
}

public static Mono<Integer> mapInteger(Integer num) { // This is here to simulate the db call
    return Mono.just(num)
            .flatMap(t -> {
                if (t == 5)
                    return Mono.empty();
                else
                    return Mono.just(num * 2);
            })
            .switchIfEmpty(Mono.error(new RuntimeException("Error happened while mapping integer!")));
}

这将打印以下值:

2
4
6
8
error with null
12
14
16
18
20

PS。当流中发生另一个错误时,我没有问题。

更新: mapInteger() 用于模拟以下对响应式mongo 存储库的调用:

public Mono<MetaData> getFromDbByKey(String key) {
        return repository
            .findByKeyAndDeletedIsFalse(key)
            .switchIfEmpty(Mono.error(() -> new RuntimeException()));
    }

并且对 getFromDbByKey() 的调用返回这个元数据,我需要将它映射到主流程中的通量。

通过 onErrorContinue,我们捕获 Throwable,并根据其类型对每种类型的错误进行不同的处理。

【问题讨论】:

  • 作为一种解决方法,您可以更改代码:.flatMap(t -&gt; { if (t == 5) throw new RuntimeException("Error happened while mapping integer value " + t);,您可以在onErrorContinue 中记录此异常消息
  • onErrorContinue 是高级运算符,您是否 100% 确定需要它并且不能使用其他 onError 处理程序之一? (包括flatMap内,附加到mapInteger
  • null 来自这样一个事实,即在出现错误的地方,发生的唯一信号是onComplete。所以没有数据信号传递给处理程序,因此null
  • @VasylSarzhynskyi 感谢您的评论。不幸的是,我不能这样做......我还更新了帖子以澄清为什么我不能这样做,因为在我的真实代码中它是对数据库的调用
  • @SimonBaslé 感谢您的解释。是的,我们需要使用它,因为我们处理错误的方式不同……在某些情况下,我们会保存到数据库中,稍后调度程序会决定要做什么,而在其他一些情况下,日志就足够了。学院建议的另一个解决方案是创建一个自定义 RuntimeException 并将导致问题的对象包含在其中。

标签: java spring-boot spring-webflux project-reactor reactive


【解决方案1】:

如果 t==5 ,您不能指望在返回 Mono.empty() 时收到“项目” t 吗?

此代码将打印您需要的内容。

    Flux.range(1, 10)
        .flatMap(integer -> mapInteger(integer))
        .doOnNext(System.out::println)
        .onErrorContinue((throwable, o) -> {
          System.out.println(throwable.getMessage());
        })
        .subscribe();
  }
  public static Mono<Integer> mapInteger(Integer num) { // This is here to simulate the db call
    return Mono.just(num)
        .flatMap(t -> {
          if (t == 5)
            return Mono.empty();
          else
            return Mono.just(num * 2);
        })
        .switchIfEmpty(Mono.error(new RuntimeException("error with " + num)));
  }

打印:

2
4
6
8
error with 5
12
14
16
18
20

在你实际的 mongoDB 调用中,你可以有类似的东西:

public Mono<MetaData> getFromDbByKey(String key) {
        return repository
            .findByKeyAndDeletedIsFalse(key)
            .switchIfEmpty(Mono.error(() -> new RuntimeException("Couldnt find metadata which is not deleted for the key: " + key)));
    }

【讨论】:

  • 这在我的真实代码中是做不到的,请检查帖子的更新。
  • @AhmedAli 好的。明白了。请检查答案。我更新了
  • 感谢您的回答,我认为这与同事给我的建议非常相似,请检查我添加的答案。
【解决方案2】:

这是一位同事向我建议的解决方案/解决方法:

public static void main(String[] args) {

    Flux.range(1, 10)
        .flatMap(integer -> mapInteger(integer)
                           .switchIfEmpty(Mono.error(
                                new CustomException("Error happened while mapping integer!", 
                            integer))))
        .doOnNext(System.out::println)
        .onErrorContinue((throwable, o) -> {
            if (CustomException.class.isInstance(throwable))
                System.out.println("Object to blame " + ((CustomException)throwable).getBlamedObject());
            System.out.println("error with " + o);
        })
    .subscribe();
  }

  public static Mono<Integer> mapInteger(Integer num) {
      return Mono.just(num)
          .flatMap(t -> {
              if (t == 5)
                  return Mono.empty();
              else
                  return Mono.just(num * 2);
          });
  }

  public static class CustomException extends RuntimeException {
      private Integer blamedObject;

      public CustomException(String message, Integer blamedObject) {
          super(message);
          this.blamedObject = blamedObject;
      }

      public Integer getBlamedObject() {
          return blamedObject;
      }
  }

这将返回:

2
4
6
8
Object to blame 5
error with null
12
14
16
18
20

解决方案/解决方法是创建一个自定义异常来接受有错误的对象并将 switchIfEmpty() 移动到主要流程,以便我可以获取导致问题的对象以及以后在 onErrorContinue() 中,我可以检查异常类型并获取导致问题的对象。

这对我有用,因为示例中的 mapInteger() 只是对响应式 mongo 存储库的模拟,如果在原始帖子中没有找到更新的结果,它将返回空单声道。

【讨论】:

  • 有很多方法可以做同样的事情。我给了你一个方法,它不需要新的自定义异常类。
  • 你说得对,在简单的情况下我不需要新课程。在我的情况下,它涉及一条 kafka 消息,我需要将其传递给 onErrorContinue() 以检索所需的数据并将其保存在数据库中,以便稍后确定合适的重试算法。
猜你喜欢
  • 2019-08-30
  • 1970-01-01
  • 2012-08-25
  • 2013-11-12
  • 1970-01-01
  • 2015-11-20
  • 2018-11-25
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多