【问题标题】:Reactor Sink that emits only 1 event at a time?一次只发出 1 个事件的反应器接收器?
【发布时间】:2021-07-23 22:39:38
【问题描述】:

我正在使用 Replaying Reactor Sinks,我正在尝试实现单播和重播处理器的混合。我希望它同时仅向一个订阅者发出(UnicastProcessor),但它也可以在订阅时发出默认值(ReplayProcessor)。这是与真实案例类似的情况:

Flux<Boolean> monoC = Sinks.many().replay().latestOrDefault(true).asFlux().doOnNext(integer -> System.out.println(new Date() + " - " + Thread.currentThread().getName() + "    emiting next"));
for(int i = 0; i < 5; i++) {
    new Thread(() -> {
        monoC.flatMap(unused ->
                webClientBuilder.build()
                        .get()
                        .uri("https://www.google.com")
                        .retrieve()
                        .toEntityFlux(String.class)
                        .doOnSuccess(stringResponseEntity -> {
                            System.out.println(new Date() + " - " + Thread.currentThread().getName() + "    finished processing");
                        })
        ).subscribe();
    }).start();
}

那是打印:

emiting next
...
emiting next
finished processing
...
finished processing

相反,我希望它打印出来:

emiting next
finished processing
...
emiting next
finished processing

更新,对真实案例场景的更多说明:

真实的情况是:我有一个 Spring WebFlux 应用程序,它的作用类似于中继,它在特定端点 A 上接收请求,并将其中继到另一个微服务 B。然后,如果我这样做,该微服务可以回复 429走得太快,并且在标题中我必须等待多长时间才能再次重试。我已经使用 .retry 运算符和 Mono.delay 实现了重试,但与此同时,我可以在我的第一个端点 A 上收到另一个请求,该请求必须被阻止,直到 Mono.delay 完成。

我正在尝试使用 Replay Sink 来实现这一点,因此在收到 429 后,我向 sink 发出“false”,并且在 Mono.delay 结束后,它向 sink 发出 true,所以如果在同时,当我在 A 上收到任何进一步的请求时,它可以过滤掉所有的错误并等待发出一个真值。

最重要的问题是,当我收到太多请求在 A 上中继时,微服务 B 开始响应缓慢,并且变得超载。因此,我想限制接收器发射的速率。准确地说,我希望发布者发出一个值,但在订阅者点击 onCompleted 之前不要再发出。

【问题讨论】:

  • 我不太了解您的用户案例。 I would like it to accept only one subscriber, so following subscribers are hold until first completes. 是什么意思,请描述您的实际用户案例。
  • 所以,如果您看到我提供的示例,我想序列化(按并发 1 顺序)从我的应用程序的不同位置完成的所有 GET 请求。
  • 我可以阅读您的代码,但我不关心您的代码,我想知道用户案例。你的意图是什么,你在建造什么,你想做什么。因为在反应式中我们通常不使用Thread 并且代码很乱,所以我不明白你在做什么。如果我不明白您想要做什么,也许我们可以帮助您以更正确的反应方式构建它。
  • 你的代码有几个很奇怪的地方,为什么布尔值的latestOrDefault?为什么你 flatMaping 超过 monoC 而不使用布尔值?您的代码在如何使用接收器方面还差得很远
  • 我建议你阅读 reactor 文档中的 sink 部分,其中描述了如何使用 sink。 projectreactor.io/docs/core/release/reference/#processors

标签: java spring-webflux project-reactor reactor


【解决方案1】:

只要我正确理解了您的问题,您就希望按顺序处理对 B 的请求。在这种情况下,你应该看看https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#flatMap-java.util.function.Function-int-

public final <V> Flux<V> flatMap(Function<? super T, ? extends Publisher<? extends V>> mapper, int concurrency)

我认为你的情况应该是这样的

//sinks should be global variable for your controller, initialized in @PostConstruct
        var sinks = Sinks
                //unsafe is required for multithreading
                .unsafe()
                .many()
                .replay()
                .latest();
        sinks.asFlux()
                .doOnNext(it -> System.out.printf("%s is emitting %s\n", Thread.currentThread().getName(), it))
                .flatMap(counter -> {
                    return webClientBuilder.build()
                            .get()
                            .uri("https://www.google.com")
                            .retrieve()
                            .toEntityFlux(String.class)
                            .doOnSuccess(stringResponseEntity -> {
                                System.out.println(counter + " " + new Date() + " - " + Thread.currentThread().getName() + "    finished processing with " + stringResponseEntity.getStatusCode());
                            })
                            .then(Mono.just(counter));
                    //concurrency = 1 causes the flatMap being handled only once in parallel
                }, 1)
                .doOnError(Throwable::printStackTrace)
                //this subscription also must be done in @PostConstruct
                .subscribe(counter -> System.out.printf("%s completed in %s\n", counter, Thread.currentThread().getName()));

        //and this is your endpoint method
        for (int i = 0; i < 5; i++) {
            int counter = i;
            new Thread(() -> {
                var result = sinks.tryEmitNext(counter);
                if (result.isFailure()) {
                    //mb in that case you should retry
                    System.out.printf("%s emitted %s. with fail: %s\n", Thread.currentThread().getName(), counter, result);
                } else {
                    System.out.printf("%s successfully emitted %s\n", Thread.currentThread().getName(), counter);
                }
            }).start();
        }

【讨论】:

    猜你喜欢
    • 2010-12-09
    • 1970-01-01
    • 2021-11-10
    • 1970-01-01
    • 2023-04-02
    • 1970-01-01
    • 1970-01-01
    • 2020-03-19
    • 1970-01-01
    相关资源
    最近更新 更多