【问题标题】:Spring Reactor Merge vs ConcatSpring Reactor 合并与 Concat
【发布时间】:2018-07-06 19:12:30
【问题描述】:

我在玩 Spring reactor,我看不出 concatmerge 运算符之间有什么区别

这是我的例子

    @Test
    public void merge() {
        Flux<String> flux1 = Flux.just("hello").doOnNext(value -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        Flux<String> flux2 = Flux.just("reactive").doOnNext(value -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        Flux<String> flux3 = Flux.just("world");
        Flux.merge(flux1, flux2, flux3)
                .map(String::toUpperCase)
                .subscribe(System.out::println);
    }

    @Test
    public void concat() {
        Flux<String> flux1 = Flux.just("hello").doOnNext(value -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        Flux<String> flux2 = Flux.just("reactive").doOnNext(value -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        Flux<String> flux3 = Flux.just("world");
        Flux.concat(flux1, flux2, flux3)
                .map(String::toUpperCase)
                .subscribe(System.out::println);    
}

两者的行为完全相同。有人可以解释这两种操作之间的区别吗?

【问题讨论】:

    标签: java spring flux reactor


    【解决方案1】:

    merge 和 concat 的本质区别在于,在 merge 中,两个流都是实时的。在 concat 的情况下,第一个流被终止,然后另一个流被连接到它。

    Concat


    Merge

    【讨论】:

    • 但是合并中的发射顺序怎么可能总是一样的呢?
    • 顺序是什么?
    • 通量 1、2 和 3 一直
    • 还有时差吗?
    • 不,从来没有,这就是我提出问题的原因
    【解决方案2】:

    API 文档中已经提到了不同之处,虽然 concat 首先完全读取一个通量,然后将第二个通量附加到该通量,但 merge 运算符不保证两个通量之间的顺序。

    为了看到区别,修改你的merge()代码如下。

    例如下面的示例代码

    //Flux with Delay
    @Test
    public void merge() {
        Flux<String> flux1 = Flux.just("Hello", "Vikram");
    
        flux1 = Flux.interval(Duration.ofMillis(3000))
        .zipWith(flux1, (i, msg) -> msg);
        
        
        Flux<String> flux2 = Flux.just("reactive");
        flux2 = Flux.interval(Duration.ofMillis(2000))
                .zipWith(flux2, (i, msg) -> msg);
        
        Flux<String> flux3 = Flux.just("world");
        Flux.merge(flux1, flux2, flux3)
                .subscribe(System.out::println);
    
        try {
            Thread.sleep(8000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    

    当您修改 Flux.interval 持续时间时,当前设置为 3000 毫秒,您将看到使用 merge() 的输出不断变化。但是使用 concat(),输出将始终相同。

    【讨论】:

    • 嗨@Vikram Rawat,你的例子的输出是反应性的,世界,你好,维克拉姆。如果它没有任何睡眠,为什么不首先显示“世界”这个词?
    • 嗨@AleGallagher,很好。我已经编辑了代码。如果你运行它,输出将是世界,反应,你好,维克拉姆。较早的代码在 doOnNext 中调用睡眠,这使订阅者进入睡眠状态,而不是延迟通量 2 的“反应性”发射。因此,我将通量 2 更改为现在延迟数据的发射(“反应性”)。希望这是有道理的。
    【解决方案3】:

    另一个值得注意的区别是 concat 的所有变体都懒惰地订阅第二个流,只有在第一个流终止之后。

    而合并的所有变体热切地订阅发布者(所有发布者一起订阅)

    运行以下代码突出显示了这一方面:

    //Lazy subscription of conact
    Flux.concat(Flux.just(1, 2, 3, 4).delayElements(Duration.ofMillis(500)),
                    Flux.just(10, 20, 30, 40).delayElements(Duration.ofMillis(500)))
                    .subscribe(System.out::println, System.out::println);
    
    
    
    //Eager subscription of the merge. Also, try mergeSequential.
    Flux.merge(Flux.range(500, 3).delayElements(Duration.ofMillis(500)),
                    Flux.range(-500, 3).delayElements(Duration.ofMillis(300)))
                    .subscribe(System.out::println, System.out::println);
    

    【讨论】:

      猜你喜欢
      • 2018-11-29
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-10-02
      • 2020-09-12
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多