【问题标题】:How to chain reactor subscribers如何链接 reactor 订阅者
【发布时间】:2019-07-01 14:56:48
【问题描述】:

我有一个现有的接口链,我想以reactor 的身份运行,而不是管理我自己的线程和队列

public interface UserLookupService {
    public User lookup(String id);
}
public interface UsersHandler {
    public void handle(List<User> users>);
}
UserLookupService userSvc = ...;
UsersHandler usersHandler = ...

// Works well to lookup users in parallel. 
Flux.just("userA", "userB", "userC")
    .parallel(2)
    .runOn(Schedulers.parallel())
    .subscribe(str -> {
        userSvc.lookup(str);
    });

我如何链接该结果,以便它调用 UsersHandler 批量 User

【问题讨论】:

    标签: java spring spring-boot project-reactor reactor


    【解决方案1】:

    订阅某些东西会触发链,因此您通常不能“链接”订阅者,它们是链中的最后一件事。

    如果这样想,你设置了你的反应式管道,当你subscribe时,你触发管道启动,链会产生结果。

    在网络服务器中,subscriber 通常是调用客户端,当客户端 subscribes 触发服务器中的事件链时,将发布数据。

    Flux 有点像 1 到 n 个 Monos 的列表。可以说,Mono/Flux 中的每个对象都有许多“状态”。这些是SuccessErrorCancelNextCompleted 等等。

    Mono/Flux 内部进入Success 状态时,它将发出其中的值。当单声道解决了某些问题时,Mono 通常会转到 Success

    当您声明Flux.just("userA", "userB", "userC") 时,您基本上是在要求通量解析您输入的输入。放置字符串会立即解决,因此通量将进入Success 状态并在出现Subscribes 时立即开始发射字符串。所以你所要做的就是在某人Subscribes之后声明你想要发生的链。

    这可以通过几种不同的方式完成,当您想要做某事并更改值时,例如您想要从 string 更改为 user,我们通常使用 map

    如果我们只想对每个对象做一些事情而不返回任何东西,我们可以使用doOnNext

    Flux.just("userA", "userB", "userC")
                .parallel(2)
                .runOn(Schedulers.parallel())
                .map(userString -> {
                    return lookupService.lookup(userString);
                })
                .doOnNext(user -> {
                    // if you want to do something on each user
                    // will return void so if you want to log something
                    // or handle each user
                }).subscribe();
    

    订阅应该是链条中的最后一件事。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-04-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-12-14
      相关资源
      最近更新 更多