【问题标题】:Java Reactor - conditional stream executionJava Reactor - 条件流执行
【发布时间】:2019-03-17 00:26:31
【问题描述】:

我想知道如何使用 Reactor 创建“逻辑流”。

假设我要实现以下场景:

作为输入,我有要保存在数据库中的对象。作为输出,我想得到表示执行消息的 Mono。

  • 选项 1:如果要保存的对象的所有字段都已填满,则我执行其他操作,将其保存到数据库并最终返回“Success”消息

  • 选项 2:如果要保存的对象至少有一个字段未填写,我返回“Error

我已经创建了这样的代码:

Mono<String> message = Mono.just(new User("Bob the Reactor master")) // user with name = can be saved
    .flatMap(user -> {
        if(user.getName() != null && user.getName().length() > 1){
            // Perform additional operations e.g. user.setCreatedDate(new Date())
            // Save to repository e.g. repository.save(user)
            return Mono.just("Success!");
        }
        else{
            return Mono.just("Error!");
        }
    })
    .doOnNext(System.out::println); // print stream result

message.subscribe();

此代码是否 100% 响应式(具有所有优点)?如果没有,那它会是什么样子?

【问题讨论】:

  • 因为你没有任何异步部分,所以这里不需要reactor。
  • 假设此代码位于微服务 POST 处理程序中。
  • 在这种情况下不需要else 子句。
  • @Wicia 将其作为处理程序上的 \@Valid 条件进行检查不是更好吗?

标签: java reactive-programming project-reactor


【解决方案1】:

答案取决于您评论的存储库。

  • Repository 是非阻塞的,返回 Mono 或 Flux

    您应该订阅它然后返回 Success Mono。在你的 if 语句中:

    return repository.save(user).then(Mono.just("Success!"));
    
  • 存储库被阻塞

    您应该使您的存储库调用非阻塞,将其执行移动到单独的线程。 Reactor 的方法是用 Mono 包装它并订阅弹性调度程序或您的自定义调度程序。

    Mono<String> message = Mono.just(new User("Bob the Reactor master"))
            .filter(user -> user.getName() != null && user.getName().length() > 1)
            .map(user -> user) // Perform additional operations e.g. user.setCreatedDate(new Date())
            .flatMap(user -> Mono.fromRunnable(() -> repository.save(user))
                    .subscribeOn(Schedulers.elastic())
                    .then(Mono.just("Success!")))
            .switchIfEmpty(Mono.just("Error!"));
    

【讨论】:

    猜你喜欢
    • 2018-09-26
    • 2021-05-01
    • 1970-01-01
    • 2021-06-02
    • 1970-01-01
    • 1970-01-01
    • 2018-01-07
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多