【发布时间】:2019-10-21 06:41:22
【问题描述】:
Flux.create 和 Flux.push 有什么区别?我正在寻找(理想情况下是使用示例用例)来了解何时应该使用其中一个。
【问题讨论】:
Flux.create 和 Flux.push 有什么区别?我正在寻找(理想情况下是使用示例用例)来了解何时应该使用其中一个。
【问题讨论】:
来自https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html的文档
创建() 通过 FluxSink API 以编程方式创建能够以同步或异步方式发射多个元素的 Flux。
推() 以编程方式创建一个 Flux,该 Flux 能够通过 FluxSink API 从单线程生产者发出多个元素。
使用 create(),您可以从多个线程生成项目。仅当您不打算使用多个线程时才使用 push()。
【讨论】:
FluxJavaDoc 复制和粘贴内容并不是一个好的答案。
create() 或push() 之间进行选择。我想说push() 确实经过优化以更快地工作,唯一的缺点是它适用于单线程。否则 - 竞争条件。您绝对可以在 Docs 中找到更多信息:projectreactor.io/docs/core/release/reference/#producing。不知道我们还能在这里解释什么......
正如文档所说:
如果您想适应一些其他异步外部 API 而不必担心取消和背压(由 this to 方法自动处理),它们会很有用。
这里是一个例子:
@Test
void testCreateToWrapMultiThreadsAsyncExternalAPI() {
SequenceCreator sequenceCreator = new SequenceCreator();
int numberOfElements = 10000;
StepVerifier.create(sequenceCreator.createNumberSequence(numberOfElements))
.expectNextCount(numberOfElements)
.verifyComplete();
}
@Slf4j
class SequenceCreator {
public Flux<Integer> createNumberSequence(Integer elementsToEmit) {
return Flux.create(sharedSink -> multiThreadSource(elementsToEmit, sharedSink));
}
void multiThreadSource(Integer elementsToEmit, FluxSink<Integer> sharedSink) {
Thread producingThread1 = new Thread(() -> emitElements(sharedSink, elementsToEmit / 2), "Thread_1");
Thread producingThread2 = new Thread(() -> emitElements(sharedSink, elementsToEmit / 2), "Thread_2");
producingThread1.start(); // Start to emit elements
producingThread2.start();
try {
producingThread1.join(); // Wait that thread finishes
producingThread2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
sharedSink.complete();
}
public void emitElements(FluxSink<Integer> sink, Integer count) {
IntStream.range(1, count + 1).boxed().forEach(n -> {
log.info("onNext {}", n);
sink.next(n);
});
}
}
这里你有一个并行发射元素的源。源由 2 个线程组成,每个线程发出 numberOfElements / 2 元素,对应于本例中线程 1 和线程 2 发出的 5000 个元素。此源使用 create 方法包装,在这里我们测试总共发出 10'000 个元素。测试通过了。
现在尝试将create 替换为push。测试不会通过(如果通过,请使用更大的数字作为numberOfElements)。这是因为push 预计只有一个生产线程可以一次调用next、complete 或error,因为它不会以并发方式管理 FluxSink API 上的使用.
希望这个玩具示例可以帮助您了解何时使用其中一个而不是另一个。
【讨论】: