【问题标题】:How to start subscription when another publisher is ready with project Reactor当另一个发布者准备好项目 Reactor 时如何开始订阅
【发布时间】:2021-03-12 18:49:38
【问题描述】:

使用 Reactor,我有一个 Mono 和一个 FluxMono 在一个频道上进行一些轮询,Flux 在这个频道上发布。在测试中,我希望Flux 仅在Mono 有效轮询时才开始发布数据。由于Mono 上的投票不会在订阅后立即开始,所以我在开始发布之前一直使用固定的delaySubscription

Mono<...> polling;
Flux<...> dataPublisher;
polling
     .zipWith(dataPublisher.collectList().delaySubscription(Duration.ofSeconds(1)))
     .block()

这种方式“有效”,但有点不稳定,因为发布发生在轮询 Mono 可能尚未准备好轮询的时候。

我试图找到另一种不那么不稳定的方法来测试它,但还没有找到。任何帮助将不胜感激。

【问题讨论】:

标签: java spring-webflux project-reactor


【解决方案1】:

您正在寻找接收器。基本上它是一个你可以触发的东西,然后你可以用它来启动一个新的单声道(单个触发器)/通量(多个触发器)。 一个简化的例子:

    @Test
    public void pollAndPublish() {
        //create a sink to trigger
        Sinks.One<Boolean> sink = Sinks.one();
        //start the polling mono, I'm asusming polling starts somewhere after some time, I made it 2 seconds
        Mono<String> polling = Mono.just("I start polling after some time")
                                   .delayElement(Duration.ofSeconds(2))
                                   .map(it -> "I'm polling")
                                   .doOnNext(it -> sink.tryEmitValue(true))
                                   .delayElement(Duration.ofSeconds(10))
                                   .map(it -> "I'm done polling");

        //turn the trigger into a new mono and map it to your publisher flux
        Flux<String> dataPublisher = sink.asMono()
                                         .flatMapMany(it -> Flux.just("elements", "to", "publish")
                                                                .delayElements(Duration.ofSeconds(1)));

        //start the polling pipeline
        polling.subscribe();

        //start the publisher pipeline
        StepVerifier.create(dataPublisher)
                    .expectNext("elements")
                    .expectNext("to")
                    .expectNext("publish")
                    .verifyComplete();
    }

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多