【问题标题】:Reactor GroupedFlux - wait to completeReactor GroupedFlux - 等待完成
【发布时间】:2018-07-13 00:22:39
【问题描述】:

有一个像下面这样的异步发布者,Project Reactor 有没有办法等到整个流完成处理?
当然,不必添加一个未知持续时间的睡眠......

@Test
public void groupByPublishOn() throws InterruptedException {
    UnicastProcessor<Integer> processor = UnicastProcessor.create();

    List<Integer> results = new ArrayList<>();
    Flux<Flux<Integer>> groupPublisher = processor.publish(1)
                                                  .autoConnect()
                                                  .groupBy(i -> i % 2)
                                                  .map(group -> group.publishOn(Schedulers.parallel()));

    groupPublisher.log()
                  .subscribe(g -> g.log()
                                   .subscribe(results::add));

    List<Integer> input = Arrays.asList(1, 3, 5, 2, 4, 6, 11, 12, 13);
    input.forEach(processor::onNext);
    processor.onComplete();

    Thread.sleep(500);

    Assert.assertTrue(results.size() == input.size());
}

【问题讨论】:

    标签: project-reactor


    【解决方案1】:

    您可以替换这些行:

     groupPublisher.log()
                      .subscribe(g -> g.log()
                                       .subscribe(results::add));
    

    有了这个

    groupPublisher.log()
                  .flatMap(g -> g.log()
                                 .doOnNext(results::add)
                  )
                  .blockLast();
    

    flatMap 是一种比 subscribe-within-subscribe 更好的模式,它会为您负责订阅该组。

    doOnNext 负责处理消耗的副作用(向集合添加值),让您无需在订阅中执行该操作。

    blockLast() 替换了订阅,而不是让你为它阻塞直到完成的事件提供处理程序(并返回最后一个发出的项目,但你已经在 doOnNext 中处理了它)。

    【讨论】:

    • 感谢您的回答,但我无法立即致电blockLast()。我使用UnicastProcessor 作为“热源”来推送从外部系统接收到的事件。所以在我定义了流之后,我想开始订阅事件。我只需要在测试中等待完成。如果没有blockLast(),还有其他方法吗?
    • 没关系。我发现我可以使用flatMap() 的结果来调用subscribe()。然后从测试中调用onComplete()blockLast()
    【解决方案2】:

    使用 blockLast() 的主要问题是,如果您的操作无法完成,您将永远不会释放您的管道。

    您需要做的是获取 Disposable 并检查是否已完成管道,这意味着布尔值 isDisposed 它将返回 true。

    然后由您决定是否要超时,例如延迟计数实现:)

    int 计数 = 0;

    @Test
    public void checkIfItDisposable() throws InterruptedException {
        Disposable subscribe = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
                .map(number -> {
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return number;
                }).subscribeOn(Schedulers.newElastic("1"))
                .subscribe();
    
        while (!subscribe.isDisposed() && count < 100) {
            Thread.sleep(400);
            count++;
            System.out.println("Waiting......");
        }
        System.out.println("It disposable:" + subscribe.isDisposed());
    

    如果你想使用 blockLast,至少添加一个超时

    @Test
    public void checkIfItDisposableBlocking() throws InterruptedException {
        Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
                .map(number -> {
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return number;
                }).subscribeOn(Schedulers.newElastic("1"))
                .blockLast(Duration.of(60, ChronoUnit.SECONDS));
        System.out.println("It disposable");
    }
    

    如果您需要更多 ides https://github.com/politrons/reactive,可以在此处查看更多 Reactor 示例

    【讨论】:

    • 感谢您的回答。其实我想避免Tread.sleepblockLast 工作正常。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-10-04
    • 2020-07-04
    • 2011-12-28
    • 2011-09-08
    • 1970-01-01
    • 2011-10-14
    • 1970-01-01
    相关资源
    最近更新 更多