【问题标题】:How do you build a reactive in-memory repository with Spring WebFlux?如何使用 Spring WebFlux 构建反应式内存存储库?
【发布时间】:2020-12-30 10:26:42
【问题描述】:

我正在尝试实现一个反应式的内存存储库。这应该如何实现?

这是我正在尝试做的一个阻塞版本

@Repository
@AllArgsConstructor
public class InMemEventRepository implements EventRepository {

    private final List<Event> events;

    @Override
    public void save(final Mono<Event> event) {
        events.add(event.block());
        // event.subscribe(events::add); <- does not do anything
    }

    @Override
    public Flux<Event> findAll() {
        return Flux.fromIterable(events);
    }

}

我尝试使用event.subscribe(events::add);,但该事件未添加到列表中(也许我在那里遗漏了什么?)

也许events 应该是Flux&lt;Event&gt; 类型,并且有一些方法可以将Mono&lt;Event&gt; 添加到Flux&lt;Event&gt;

【问题讨论】:

    标签: java spring java-8 spring-webflux project-reactor


    【解决方案1】:

    我建议为此使用Sink

      public static class InMemEventRepository {
        private final Scheduler serializerScheduler = Schedulers.single();
    
        private final Sinks.Many<Event> events = Sinks.many().replay().all();
    
        public void save(Mono<Event> event) {
          event
              .publishOn(serializerScheduler) // If event will be published on multiple threads you need to serialize them
              .subscribe(x -> events.emitNext(x, EmitFailureHandler.FAIL_FAST)); 
        }
    
        public Flux<Event> findAll() {
          return events.asFlux();
        }
      }
    

    这是反应堆 3.4。对于旧版本,您可以使用处理器,但现在已弃用它们。一般来说,接收器更容易使用,但它们不会序列化来自多个线程的发射。这就是我使用调度程序的原因。

    另请参阅this answer,了解从 Sink 序列化发射的另一种方法

    【讨论】:

      【解决方案2】:

      如果您选择Flux.fromIterable,您将只能订阅以前的活动,但您将失去未来的活动

      我之前做过一个 PoC 试图获得类似的效果,你可以在 https://github.com/AlbertoSH/KeepMeUpdated 中查看它

      主要思想是有一个中心点,事件在该中心点发生并订阅存储库。每当您订阅findAll,您将获得List&lt;Item&gt; 的无限流。任何保存的项目都会触发一个新事件,任何订阅findAll 的人都会得到它

      注意这个 repo 使用的是 RxJava,所以可能需要一些到 reactor 的端口

      【讨论】:

        猜你喜欢
        • 2019-02-28
        • 1970-01-01
        • 1970-01-01
        • 2022-12-17
        • 1970-01-01
        • 2020-07-06
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多