【问题标题】:When FlatMap will listen to multiple sources concurrently?FlatMap 什么时候会同时监听多个源?
【发布时间】:2018-04-22 12:26:33
【问题描述】:

哪些情况会导致Flux::flatMap同时监听多个源(0...无穷大)?


我在实验时发现,当上游在线程thread-upstream-1 中向flatMap 发送信号并且有N 内部流,flatMap 将侦听并且它们每个都以不同的方式发送信号 /em> 线程:thread-inner-stream-i for 1<=i<=N,比对于每个 1<=i<=N 如果thread-upstream-1 != thread-inner-stream-iflatMap同时侦听所有内部流。

我认为这并不完全正确,我错过了其他一些场景。

【问题讨论】:

    标签: java spring reactive-programming project-reactor reactive-streams


    【解决方案1】:

    flatMap 不做任何并行工作,例如:它不改变线程。最简单的例子是

    Flux.range(1, 5).hide()
        .flatMap(v -> Flux.range(10 * v, 2))
        .log()
        .blockLast(); //for test purpose
    

    打印出来:

    [main] INFO  reactor.Flux.FlatMap.1 - onSubscribe(FluxFlatMap.FlatMapMain)
    [main] INFO  reactor.Flux.FlatMap.1 - request(unbounded)
    [main] INFO  reactor.Flux.FlatMap.1 - onNext(10)
    [main] INFO  reactor.Flux.FlatMap.1 - onNext(11)
    [main] INFO  reactor.Flux.FlatMap.1 - onNext(20)
    [main] INFO  reactor.Flux.FlatMap.1 - onNext(21)
    [main] INFO  reactor.Flux.FlatMap.1 - onNext(30)
    [main] INFO  reactor.Flux.FlatMap.1 - onNext(31)
    [main] INFO  reactor.Flux.FlatMap.1 - onNext(40)
    [main] INFO  reactor.Flux.FlatMap.1 - onNext(41)
    [main] INFO  reactor.Flux.FlatMap.1 - onNext(50)
    [main] INFO  reactor.Flux.FlatMap.1 - onNext(51)
    [main] INFO  reactor.Flux.FlatMap.1 - onComplete()
    

    如您所见,仅在main 中生成。如果在初始范围后添加publishOnflatMap 会在publishOn 将切换到的同一单线程中生成所有内容。

    然而,flatMap 所做的是订阅多个内部Publisher,直到concurrency 参数,默认为Queues.SMALL_BUFFER_SIZE (256)。

    这意味着如果您将其设置为3flatMap 会将 3 个源元素映射到它们内部的 Publisher 并订阅这些发布者,但会等待至少一个完成后才开始映射更多源元素。

    如果内部Publisher使用publishOnsubscribeOn,那么flatMap自然会让它们的事件发生在当时定义的线程中:

    Flux.range(1, 5).hide()
        .flatMap(v -> Flux.range(v * 10, 2)
                          .publishOn(Schedulers.newParallel("foo", 3)))
        .flatMap(v -> Flux.range(10 * v, 2))
        .log()
        .blockLast(); //for test purpose
    

    哪些打印:

    [main] INFO  reactor.Flux.FlatMap.1 - onSubscribe(FluxFlatMap.FlatMapMain)
    [main] INFO  reactor.Flux.FlatMap.1 - request(unbounded)
    [foo-1] INFO  reactor.Flux.FlatMap.1 - onNext(10)
    [foo-1] INFO  reactor.Flux.FlatMap.1 - onNext(11)
    [foo-1] INFO  reactor.Flux.FlatMap.1 - onNext(20)
    [foo-1] INFO  reactor.Flux.FlatMap.1 - onNext(21)
    [foo-1] INFO  reactor.Flux.FlatMap.1 - onNext(30)
    [foo-1] INFO  reactor.Flux.FlatMap.1 - onNext(31)
    [foo-4] INFO  reactor.Flux.FlatMap.1 - onNext(50)
    [foo-4] INFO  reactor.Flux.FlatMap.1 - onNext(51)
    [foo-4] INFO  reactor.Flux.FlatMap.1 - onNext(40)
    [foo-4] INFO  reactor.Flux.FlatMap.1 - onNext(41)
    [foo-4] INFO  reactor.Flux.FlatMap.1 - onComplete()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2012-02-26
      • 1970-01-01
      • 2016-05-24
      • 1970-01-01
      • 1970-01-01
      • 2016-08-06
      • 2014-01-28
      • 2014-05-15
      相关资源
      最近更新 更多