【问题标题】:How do I configure backpressure in Spring WebFlux?如何在 Spring WebFlux 中配置背压?
【发布时间】:2020-05-30 00:17:12
【问题描述】:

我正在尝试了解如何在 Spring WebFlux 中应用背压。背压原理我懂,但是无法复现,所以没完全理解。

我们来看下面的例子:

public void test() throws InterruptedException {
    EmitterProcessor<String> processor = EmitterProcessor.create();

    new Thread(() -> {
        int i = 0;
        while(runThread) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException ignored) {
            }
            processor.onNext("Value: " + i);
            i++;
        }
        processor.onComplete();
    }).start();

    processor
            .subscribe(makeSubscriber("FIRST - "), Throwable::printStackTrace);
}

private Consumer<String> makeSubscriber(String label) {
    return v -> {
        System.out.println(label + v);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException ignored) {
        }
    };
}

我以 EmitterProcessor 的形式创建了一个 Hot Flux,并在一个单独的线程中开始为它生成数据。 低一点,我订阅它。订阅者的速度比生成元素的速度慢,所以问题应该开始出现,对吧? 但是订阅者逻辑在生产者线程上运行。当我调用 processor.onNext() 时,它会同步调用所有订阅者,所以如果订阅者很慢,发布者也会变慢。所以,那么背压似乎没有用。

我还尝试制作两个 Spring Boot WebFlux 应用程序,一个带有 Flux 端点,另一个使用端点,因此我可以确定消费者在单独的线程上运行。但是,我在消费者背压下所做的任何尝试都无济于事。没有缓冲区被填充,没有任何东西被丢弃或任何东西!

谁能给我一个背压的具体例子?最好在 Spring WebFlux 中,但我会采用任何反应式 Java 库。

【问题讨论】:

标签: java reactive-programming spring-webflux project-reactor backpressure


【解决方案1】:

您选择的subscribe 方法变体的文档如下:

订阅将请求无限制的需求 (Long.MAX_VALUE)。

也就是说,您自己关闭了背压。

要使用背压,请使用Flux.subscribe(Subscriber)订阅

【讨论】:

  • 即使我改变它,似乎也没有应用背压。我正在使用 .onBackpressureBuffer(4, BufferOverflowStrategy.DROP_LATEST) 但没有一个值被丢弃。
猜你喜欢
  • 1970-01-01
  • 2018-05-11
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-02-25
  • 2019-05-04
  • 2018-03-31
  • 2021-03-26
相关资源
最近更新 更多