【问题标题】:What is the difference between Flux.create() vs Flux.push() in project reactor?项目反应堆中的 Flux.create() 与 Flux.push() 有什么区别?
【发布时间】:2019-10-21 06:41:22
【问题描述】:

Flux.create 和 Flux.push 有什么区别?我正在寻找(理想情况下是使用示例用例)来了解何时应该使用其中一个。

【问题讨论】:

    标签: project-reactor reactor


    【解决方案1】:

    来自https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html的文档

    创建() 通过 FluxSink API 以编程方式创建能够以同步或异步方式发射多个元素的 Flux。

    推() 以编程方式创建一个 Flux,该 Flux 能够通过 FluxSink API 从单线程生产者发出多个元素。

    使用 create(),您可以从多个线程生成项目。仅当您不打算使用多个线程时才使用 push()。

    【讨论】:

    • 该问题要求使用代码示例进行解释。简单地从FluxJavaDoc 复制和粘贴内容并不是一个好的答案。
    • 我投票赞成这个答案,因为从代码的角度来看没有什么可显示的。您只需要从源代码中确定它是单线程的还是多线程的。然后在create()push() 之间进行选择。我想说push() 确实经过优化以更快地工作,唯一的缺点是它适用于单线程。否则 - 竞争条件。您绝对可以在 Docs 中找到更多信息:projectreactor.io/docs/core/release/reference/#producing。不知道我们还能在这里解释什么......
    • 嘿,为了完成三重奏,我介于两者之间。没有什么更明智的可以添加到答案中了,但我会说在不引用源代码的情况下从 Javadocs 复制/粘贴仍然是一种不好的形式。
    【解决方案2】:

    正如文档所说:

    • create:通过 FluxSink API 以编程方式创建能够以同步或异步方式发射多个元素的 Flux。这包括从多个线程发射元素。
    • push:以编程方式创建一个 Flux,该 Flux 能够通过 FluxSink API 从一个单线程生产者发出多个元素。

    如果您想适应一些其他异步外部 API 而不必担心取消和背压(由 this to 方法自动处理),它们会很有用。

    这里是一个例子:

    @Test
    void testCreateToWrapMultiThreadsAsyncExternalAPI() {
        SequenceCreator sequenceCreator = new SequenceCreator();
        int numberOfElements = 10000;
    
        StepVerifier.create(sequenceCreator.createNumberSequence(numberOfElements))
                .expectNextCount(numberOfElements)
                .verifyComplete();
    }
    
    
    @Slf4j
    class SequenceCreator {
    
        public Flux<Integer> createNumberSequence(Integer elementsToEmit) {
            return Flux.create(sharedSink -> multiThreadSource(elementsToEmit, sharedSink));
        }
    
        void multiThreadSource(Integer elementsToEmit, FluxSink<Integer> sharedSink) {
            Thread producingThread1 = new Thread(() -> emitElements(sharedSink, elementsToEmit / 2), "Thread_1");
            Thread producingThread2 = new Thread(() -> emitElements(sharedSink, elementsToEmit / 2), "Thread_2");
    
            producingThread1.start(); // Start to emit elements
            producingThread2.start();
    
            try {
                producingThread1.join(); // Wait that thread finishes
                producingThread2.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            sharedSink.complete();
        }
    
        public void emitElements(FluxSink<Integer> sink, Integer count) {
            IntStream.range(1, count + 1).boxed().forEach(n -> {
                log.info("onNext {}", n);
                sink.next(n);
            });
        }
    }
    

    这里你有一个并行发射元素的源。源由 2 个线程组成,每个线程发出 numberOfElements / 2 元素,对应于本例中线程 1 和线程 2 发出的 5000 个元素。此源使用 create 方法包装,在这里我们测试总共发出 10'000 个元素。测试通过了。

    现在尝试将create 替换为push。测试不会通过(如果通过,请使用更大的数字作为numberOfElements)。这是因为push 预计只有一个生产线程可以一次调用nextcompleteerror,因为它不会以并发方式管理 FluxSink API 上的使用.

    希望这个玩具示例可以帮助您了解何时使用其中一个而不是另一个。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-12-12
      • 2019-10-23
      • 2011-08-18
      • 1970-01-01
      • 1970-01-01
      • 2021-12-03
      相关资源
      最近更新 更多