【问题标题】:Project Reactor: Multiple Publishers making HTTP calls and one Subscriber to handle all resultsProject Reactor:多个发布者发出 HTTP 调用,一个订阅者处理所有结果
【发布时间】:2019-07-23 08:12:47
【问题描述】:

以下代码的问题是订阅者只能看到第一个通量的项目(即只打印1)。有趣的是,如果我添加delayElements,它可以正常工作。

这是一个玩具示例,但我的意图是将其替换为Flux,它发出 HTTP GET 请求并发出其结果(也可能不止两个)。

所以重新表述我的问题,我有一个需要实施的多对一关系。考虑到我的情况,如何实施?你会使用某种处理器吗?

 public static void main(String[] args) throws Exception {
    Flux<Integer> flux1 = Flux.generate(emitter -> {
        emitter.next(1);
    });

    Flux<Integer> flux2 = Flux.generate(emitter -> {
        emitter.next(2);

    });

    Flux<Integer> merged = flux1.mergeWith(flux2);
    merged.subscribe(s -> System.out.println(s));

    Thread.currentThread().join();
}

尝试使用 TopicProcessor 实现相同的想法,但遇到相同的问题:

public static void main(String[] args) throws Exception {
    Flux<Integer> flux1 = Flux.generate(emitter -> {
        emitter.next(1);
        try {
            Thread.sleep(100);
        } catch (Exception e) {}
    });

    Flux<Integer> flux2 = Flux.generate(emitter -> {
        emitter.next(2);
        try {
            Thread.sleep(100);
        } catch (Exception e) {}
    });

    TopicProcessor<Integer> processor = TopicProcessor.create();
    flux1.subscribe(processor);
    flux2.subscribe(processor);

    processor.subscribe(s -> System.out.println(s));


    Thread.currentThread().join();
}

【问题讨论】:

  • "Thread.currentThread().join();"假设某个其他线程将中断当前线程。另一个线程是什么?

标签: java reactive-programming project-reactor reactor reactive-streams


【解决方案1】:

From the docs:

请注意,合并是为使用异步源或有限源而量身定制的。在处理尚未在专用调度器上发布的无限源时,您必须将该源隔离在其自己的调度器中,否则合并会在订阅另一个源之前尝试耗尽它。

您在这里创建了一个没有专用调度程序的无限源,因此它试图在合并之前完全耗尽该源 - 这就是您遇到问题的原因。

这在您的实际用例中可能不是问题,因为GET 请求的结果可能不会是无限的。但是,如果您想确保结果是交错的,您只需要确保使用自己的调度程序设置每个通量(通过在每个通量上调用 subscribeOn(Schedulers.elastic());。)

所以你的例子就变成了:

Flux<Integer> flux1 = Flux.<Integer>generate(emitter -> emitter.next(1))
        .subscribeOn(Schedulers.elastic());

Flux<Integer> flux2 = Flux.<Integer>generate(emitter -> emitter.next(2))
        .subscribeOn(Schedulers.elastic());

Flux<Integer> merged = flux1.mergeWith(flux2);
merged.subscribe(s -> System.out.println(s));

Thread.currentThread().join();

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-09-16
    • 1970-01-01
    • 1970-01-01
    • 2019-12-06
    • 2016-08-11
    • 2017-11-27
    相关资源
    最近更新 更多