【发布时间】:2019-07-23 08:12:47
【问题描述】:
以下代码的问题是订阅者只能看到第一个通量的项目(即只打印1)。有趣的是,如果我添加delayElements,它可以正常工作。
这是一个玩具示例,但我的意图是将其替换为Flux,它发出 HTTP GET 请求并发出其结果(也可能不止两个)。
所以重新表述我的问题,我有一个需要实施的多对一关系。考虑到我的情况,如何实施?你会使用某种处理器吗?
public static void main(String[] args) throws Exception {
Flux<Integer> flux1 = Flux.generate(emitter -> {
emitter.next(1);
});
Flux<Integer> flux2 = Flux.generate(emitter -> {
emitter.next(2);
});
Flux<Integer> merged = flux1.mergeWith(flux2);
merged.subscribe(s -> System.out.println(s));
Thread.currentThread().join();
}
尝试使用 TopicProcessor 实现相同的想法,但遇到相同的问题:
public static void main(String[] args) throws Exception {
Flux<Integer> flux1 = Flux.generate(emitter -> {
emitter.next(1);
try {
Thread.sleep(100);
} catch (Exception e) {}
});
Flux<Integer> flux2 = Flux.generate(emitter -> {
emitter.next(2);
try {
Thread.sleep(100);
} catch (Exception e) {}
});
TopicProcessor<Integer> processor = TopicProcessor.create();
flux1.subscribe(processor);
flux2.subscribe(processor);
processor.subscribe(s -> System.out.println(s));
Thread.currentThread().join();
}
【问题讨论】:
-
"Thread.currentThread().join();"假设某个其他线程将中断当前线程。另一个线程是什么?
标签: java reactive-programming project-reactor reactor reactive-streams