【问题标题】:Flux.buffer() doesnt work with switchIfEmptyFlux.buffer() 不适用于 switchIfEmpty
【发布时间】:2020-02-27 14:21:32
【问题描述】:

我有一个场景,我将使用从DB 获取实体列表

repository.getAllByIds(ids)

这将返回Flux<Entity>

如果 Flux 为空,则我需要致电 handleAllEntitiesNotFound() 否则我需要致电 handleNotFoundEntities()

repository.getAllByIds(ids)
                .buffer()
                .switchIfEmpty(__ -> handleAllEntitiesNotFound(ids, erroneousEntities))
                .flatMap(list -> handleNotFoundEntities(list))


private Flux<Entity> handleAllEntitiesNotFound(List<String> ids, List<ResponseError> erroneousEntities) {
    Flux.fromIterable(ids).subscribe(id -> erroneousEntities.add(new ResponseError("Not Found", "Not Found", id)));
    return Flux.empty();
}

我正在使用buffer() 将列表收集到Flux&lt;List&lt;Entity&gt;&gt;

问题是,当我调用服务时,它会停止,没有响应,没有日志,没有任何内容,如果我删除了.switchIfEmpty(__ -&gt; handleAllEntitiesNotFound(ids, erroneousEntities)) 行,它可以工作并返回响应,但不处理handleAllEntitiesNotFound

buffer()switchIfEmpty() 一起使用可能会出现什么问题

【问题讨论】:

    标签: java spring rest project-reactor flux


    【解决方案1】:

    我认为你在这里得出了错误的结论 - buffer()switchIfEmpty() 一起工作没有问题:

    Flux.empty()
            .buffer()
            .switchIfEmpty(Mono.just(List.of(1)))
            .subscribe(System.out::println); //Prints "[1]"
    

    但是,您的handleAllEntitiesNotFound() 方法非常可疑。您似乎正在传入一个现有列表,创建一个新的 Flux 以添加到它,然后返回一个空的 Flux。该示例无法运行,因此无法缩小确切原因的范围,但有几点很可能是罪魁祸首(单独或串联):

    • 改变传递到反应流中的现有对象通常被认为是错误的形式。返回一个 new 列表更容易、更安全(当反应流完成时,您可以根据需要将该列表与另一个列表合并。)
    • 您创建Flux 只是为了从一个列表中读取,并将元素添加到另一个列表中。这很令人困惑,而且意义不大。只需使用标准 Java 流(即ids.stream().map(id -&gt; new ResponseError("Not Found", "Not Found", id)).collect(Collectors.toList())。)
    • 您正在返回Flux.empty(),这几乎肯定是没有回复的原因。人们通常会期望 switchIfEmpty() 返回一个非空的 Flux,除非您故意将其用作副作用。
    • handleNotFoundEntities 似乎是一个奇怪的方法名称选择,它似乎传递了被发现的实体

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-08-07
      • 1970-01-01
      • 2021-03-29
      • 1970-01-01
      • 2021-08-30
      • 2021-11-28
      • 1970-01-01
      • 2014-12-25
      相关资源
      最近更新 更多