【发布时间】:2020-05-30 00:17:12
【问题描述】:
我正在尝试了解如何在 Spring WebFlux 中应用背压。背压原理我懂,但是无法复现,所以没完全理解。
我们来看下面的例子:
public void test() throws InterruptedException {
EmitterProcessor<String> processor = EmitterProcessor.create();
new Thread(() -> {
int i = 0;
while(runThread) {
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {
}
processor.onNext("Value: " + i);
i++;
}
processor.onComplete();
}).start();
processor
.subscribe(makeSubscriber("FIRST - "), Throwable::printStackTrace);
}
private Consumer<String> makeSubscriber(String label) {
return v -> {
System.out.println(label + v);
try {
Thread.sleep(1000);
} catch (InterruptedException ignored) {
}
};
}
我以 EmitterProcessor 的形式创建了一个 Hot Flux,并在一个单独的线程中开始为它生成数据。 低一点,我订阅它。订阅者的速度比生成元素的速度慢,所以问题应该开始出现,对吧? 但是订阅者逻辑在生产者线程上运行。当我调用 processor.onNext() 时,它会同步调用所有订阅者,所以如果订阅者很慢,发布者也会变慢。所以,那么背压似乎没有用。
我还尝试制作两个 Spring Boot WebFlux 应用程序,一个带有 Flux 端点,另一个使用端点,因此我可以确定消费者在单独的线程上运行。但是,我在消费者背压下所做的任何尝试都无济于事。没有缓冲区被填充,没有任何东西被丢弃或任何东西!
谁能给我一个背压的具体例子?最好在 Spring WebFlux 中,但我会采用任何反应式 Java 库。
【问题讨论】:
-
@dfritsi 是的,我已经看到了这个问题及其答案,但老实说我还是不明白。我无法复制扩展订阅者示例。 GET 方法已经做了一些背压,但是作为发布者,这不是错误的地方吗?订阅者不应该说它想要多少吗?
标签: java reactive-programming spring-webflux project-reactor backpressure