【问题标题】:Project reactor: collectList() doesn't work for Flux.create()项目反应堆:collectList() 不适用于 Flux.create()
【发布时间】:2018-04-09 12:20:14
【问题描述】:

下面的示例打印从 1 到 10 的整数和 (7, 8, 9, 10) 的列表

public void streamCollect() {

    ConnectableFlux<Integer> connect = Flux.range(1, 10)
            .publish();

    connect.subscribe(v -> System.out.println("1: " + v));

    connect
            .filter(number -> number > 6)
            .collectList()
            .subscribe(v -> System.out.println("4: " + v));

    connect.connect();
}

结果:

1:1

1:2

1:3

1:4

1:5

1:6

1:7

1:8

1:9

1:10

4: [7, 8, 9, 10]

下一个示例应该产生相同的结果,但只打印出从 1 到 10 的数字,而不是列表。为什么?

public void streamCollect() {

    ConnectableFlux<Integer> connect = Flux.<Integer>create(emitter -> {

        Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
                .forEach(t -> emitter.next(t));
    }).publish();

    connect.subscribe(v -> System.out.println("1: " + v));

    connect
            .filter(number -> number > 6)
            .collectList()
            .subscribe(v -> System.out.println("4: " + v));

    connect.connect();
}

结果:

1:1

1:2

1:3

1:4

1:5

1:6

1:7

1:8

1:9

1:10

【问题讨论】:

    标签: java reactor


    【解决方案1】:

    collectList 等待 onComplete 信号,您不会在 create lambda 中产生该信号

    【讨论】:

    • 再次发挥魅力。我理解为什么必须调用 complete() ,但是如何通过 stackoverflow 以外的其他方式获得这些知识。我的意思是有一些一般规则吗?或者错误是在发射所有项目后在 FluxSink 上操作时应该始终调用 complete(),句号。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-05-16
    • 1970-01-01
    • 1970-01-01
    • 2020-05-27
    • 1970-01-01
    • 2019-10-23
    相关资源
    最近更新 更多