【发布时间】:2020-10-11 12:00:02
【问题描述】:
假设有一个Flux 发射一些有效载荷:
Flux<Integer> payloads = Flux.range(0, 5)
.delayElements(Duration.ofSeconds(2));
这个只发出 0 到 4 之间的整数,每 2 秒一次。
任务是将另一个信号混合到这个Flux 中。另一个信号只是每 N 个单位时间发出一些东西。像这样的:
Flux<Integer> heartbeats = Flux.just(-1)
.repeat()
.delayElements(Duration.ofSeconds(1));
- 生成的
Flux应该每 2 秒输出一次从 0 到 4 的数字,也应该每 1 秒输出一次数字 -1 - 此外,生成的
Flux必须在payloadsFlux完成后立即完成 - 它应该传播两个
Fluxes 中的任何一个发出的任何错误信号
(请注意,上面描述的示例仅用于说明目的。真正的Flux 发射有效载荷可能会以无法预测的速度发射:有时快,有时慢,有时很长一段时间根本不发射)。
我可以使用“开箱即用”运算符实现的最佳效果是
Flux<Integer> flux = payloads.mergeWith(heartbeats);
但这违反了要求 2,因为合并结果仅在所有合并组件完成时才完成,而 heartbeats 永远不会完成。
(实际上,如果在payloads 完成时有一个设置为true 的标志,并且在heartbeats 上使用repeat(BooleanSupplier) 变体而不是repeat(),这可能会起作用,但这会延迟完成生成的Flux 直到下一个心跳的时刻。)
接下来要尝试的是编写我自己的 Publisher 实现,该实现将以我的任务所需的方式工作,但我想避免这种情况,因为实现 Publisher+Subscription 似乎正确这是一项棘手的任务。
有没有更简单的方法?
【问题讨论】:
标签: java reactive-programming project-reactor