【问题标题】:Spring Webflux - reactive repository saveAll(Iterable<S>) vs saveAll(Publisher<S>)Spring Webflux - 反应式存储库 saveAll(Iterable<S>) 与 saveAll(Publisher<S>)
【发布时间】:2020-09-01 13:55:12
【问题描述】:

关于 webflux 响应式存储库的小问题,尤其是关于方法 saveAll Flux saveAll(Iterable var1);与 Flux saveAll(Publisher var1);

想比较一下,我写了以下:

@Controller
@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }

    @Autowired
    private SomeReactiveRepository someReactiveRepository;

    @PostMapping(path = "/saveListInsideMono", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
    public Mono<QuestionResponse> saveListInsideMono(@RequestBody Mono<QuestionRequest> questionRequestMono) {
        //just doing some business transformation on the list inside the mono
        Mono<List<String>> enhancedStringListMono = questionRequestMono.map(questionRequest -> enhance(questionRequest));
        //take the pojo inside the mono and map it to a saveAllAndConvertToResponse method (see next method)
        Mono<QuestionResponse> questionResponseMono = enhancedStringListMono.map(enhancedStringList -> saveAllAndConvertToResponse(enhancedStringList));
        return questionResponseMono;
    }

    private QuestionResponse saveAllAndConvertToResponse(List<String> enhancedStringList) {
        // use the repository <S extends T> Flux<S> saveAll(Iterable<S> var1); + subscribe
        return someReactiveRepository.saveAll(enhancedStringList).thenReturn(new QuestionResponse(enhancedStringList));
    //this also works but not good to subscribe
        //someReactiveRepository.saveAll(enhancedStringList).subscribe();
        //return new QuestionResponse(enhancedStringList);
    }

    @PostMapping(path = "/saveFlux", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
    public Mono<QuestionResponse> saveFlux(@RequestBody Mono<QuestionRequest> questionRequestMono) {
        //just doing some business transformation on the list inside the mono
        Mono<List<String>> enhancedStringListMono = questionRequestMono.map(questionRequest -> enhance(questionRequest));
        // use the repository <S extends T> Flux<S> saveAll(Publisher<S> var1); to save the flatMapMany + fromIterable directly
        Flux<String> enhancedStringFlux = someReactiveRepository.saveAll(enhancedStringListMono.flatMapMany(Flux::fromIterable));
        Mono<QuestionResponse> questionResponseMono = enhancedStringFlux.collectList().map(enhancedString -> convertToResponse(enhancedString));
        return questionResponseMono;
    }

    private QuestionResponse convertToResponse(List<String> enhancedStringList) {
        //return the object needed
        return new QuestionResponse(enhancedStringList);
    }

    private static List<String> enhance(QuestionRequest questionRequest) {
        //dummy business transformation logic
        List<String> baseList = questionRequest.getList();
        List<String> enhancedList = baseList.stream().map(oneString -> "enhanced" + oneString).collect(Collectors.toList());
        return enhancedList;
    }

    public class QuestionRequest {
        private List<String> list;

        public List<String> getList() {
            return list;
        }
    }

    public class QuestionResponse {
        private List<String> enhancedList;

        public QuestionResponse(List<String> enhancedList) {
            this.enhancedList = enhancedList;
        }
    }

}

就“正确性”而言,两个代码都在执行预期的操作。一切都成功持久化。

但就性能、反应式范式、数据库的 IO 利用率、Netty Core 的使用而言,什么是“最佳”解决方案,为什么?

谢谢

【问题讨论】:

  • 不要订阅,消费者是订阅者(在这种情况下是调用客户端)。 return someReactiveRepository.saveAll(enhancedStringList).thenReturn(new QuestionResponse(enhancedStringList));
  • @ThomasAndolfquestion 已更新,谢谢。我只想强调这个问题的目的,即一个与另一个的优缺点。我的意思是,我可以像对第一个 /saveListInsideMono 所做的那样独立地优化它们中的每一个,但实际上比较是我想了解的。谢谢
  • 你好@K.Nicholas,可悲的是,不是真的。我阅读了整个页面,对一般的 Mono 与 Flux 有了更多的了解,但是这个用例确实涉及反应式存储库和整个处理的性能
  • @PatPatPat - 真的是同一个问题。 Mono 是管道中的单个对象,Flux 是管道中的多个对象。我不清楚使用Mono 的背压将如何工作(如果有的话),并且所涉及的数据量是您问题的核心。这个问题也可以因为过于宽泛而被关闭。您可以在projectreactor.io/docs/core/release/reference 找到更多信息

标签: java spring-webflux spring-repositories


【解决方案1】:

这完全取决于您当前拥有的对象。如果您有 Flux 的对象,请使用采用 Publisher 的 saveAll 方法。如果您有实际的 Collection 对象,请使用采用 Iterable 的 saveAll 方法。

例如,如果您查看实现 SimpleReactiveCassandraRepository,则采用 Iterable 的 saveAll 实现只是将其包装在 Flux 中并委托给接受 Flux 的 saveAll 方法

public <S extends T> Flux<S> saveAll(Iterable<S> entities) {

    Assert.notNull(entities, "The given Iterable of entities must not be null");

    return saveAll(Flux.fromIterable(entities));
}

因此,在 IO 使用率或 netty 核心使用率方面应该没有区别。此外,两者都遵循响应式范式。

SimpleReactiveCassandraRepository Code

【讨论】:

  • 您好 Michael,感谢您的评论,这让我确信这两者是可以互换的。虽然我看到两者的实现是相同的,但我的问题也与存在集合但在单声道内的情况有关。我们应该将其设为 Flux 和 saveAll Publisher,还是从单声道中提取集合并使用 saveAll 集合?
  • 我想问你为什么在 Monos 上有一个列表而不是使用 Flux?
  • 当然。在控制器层,它需要:@RequestBody Mono questionRequestMono。 QuestionRequest 有一堆字段,比如一些 int ID、一些 String 类别、一些长时间戳、一堆其他东西(不是集合)。但它也包含一个集合作为 POJO 的一部分,主要业务逻辑必须针对这个集合完成。客户/发件人是第三方,无法更改他们发送此信息的方式。他们不能发送一个通量或多个 QuestionRequest。他们只能发送一个包含列表的 QuestionRequest 对象
  • 因此我的问题是,如果可以的话,保存 INSIDE QuestionRequest 列表的“最佳”方法是什么,特别是在客户端发送的 Mono 中?
  • 如果没有看到确切的代码本身,我不确定我是否真的能就此提出建议。
猜你喜欢
  • 2022-12-27
  • 2018-09-26
  • 2019-02-28
  • 1970-01-01
  • 2020-12-24
  • 2015-11-23
  • 2021-09-22
  • 1970-01-01
相关资源
最近更新 更多