【问题标题】:Flux does not wait for completion of elements before 'then'Flux 不会在“then”之前等待元素完成
【发布时间】:2020-10-16 20:50:40
【问题描述】:

我无法理解这个问题,我不确定我做错了什么

我想等待flux结束,然后返回serverResponse的mono

我已附上代码 sn-p,doOnNext 将填充 categoryIdToPrintRepository

我查看了如何在通量结束后返回单声道,发现“then”但仍然在处理 onNextSite 之前执行“then”方法,这导致错误:

java.lang.IllegalArgumentException: 'producer' type is unknown to ReactiveAdapterRegistry

我做错了什么?

 public Mono<ServerResponse> retrieveCatalog(ServerRequest ignored) {
        return Mono.just("start").flatMap(id ->
                Flux.fromIterable(appSettings.getSites())
                        .subscribeOn(ForkJoinPoolScheduler.create("SiteCatalogScheduler"))
                        .doOnNext(this::onNextSite)
                        .then(Mono.from(ServerResponse.ok().body(categoryIdToPrintRepository.getSortedTreeValues(), String.class))));

    }

    private void onNextSite(Integer siteId) {
        IntStream.range(1, appSettings.getCatalogMaxValue()).parallel().forEach(catalogId -> {
            Optional<SiteCatalogCategoryDTO> cacheData =
                    siteCatalogCacheUseCaseService.getSiteCatalogResponseFromCache(siteId, catalogId);
            cacheData.ifPresentOrElse(siteCatalogCategoryDTO -> {/*do nothing already exist in cache*/},
                    () -> {
                    Mono<SiteCatalogCategoryDTO> catalogCategoryDTOMono = WebClient.create(getUri(siteId, catalogId))
                            .get().retrieve().bodyToMono(SiteCatalogCategoryDTO.class);
                    catalogCategoryDTOMono.subscribe(siteCatalogCategoryDTO ->
                            handleSiteServerResponse(siteCatalogCategoryDTO, siteId, catalogId));
            });
        });
    }


    private void handleSiteServerResponse(SiteCatalogCategoryDTO siteCatalogCategoryDTO,
                                          int siteId, int catalogId) {
        if (siteCatalogCategoryDTO.getResponseStatus().equals(ResponseStatus.SUCCESS))
            Flux.fromIterable(siteCatalogCategoryDTO.getMappingList())
                    .subscribe(mapSCC -> {
                        categoryIdToPrintRepository.insertIntoTree(mapSCC.getCategoryId(),
                                "Site " + siteId + " - Catalog " + catalogId + " is mapped to category " + "\"" +
                                        mapSCC.getCategoryName() + "\" (" + mapSCC.getCategoryId() + ")");
                        siteCatalogCacheUseCaseService.insertIntoSiteCatalogCache(siteId, catalogId, siteCatalogCategoryDTO);
                    });
    }

【问题讨论】:

    标签: java spring-boot netty spring-webflux project-reactor


    【解决方案1】:

    你做错了几件事,你不应该在你的应用程序中subscribe,并且你有 void 方法,除非在特定的地方,否则不应该在反应式编程中使用。

    这里是一些示例代码:

    
    // Nothing will happen, we are not returning anything, we can't subscribe
    private void doSomething() {
        Mono.just("Foo");
    }
    
    // complier error
    doSomething().subscribe( ... );
    
    

    您的应用程序是调用客户端的publisher,是订阅者,这就是为什么我们向调用客户端返回 Mono 或 Flux,它们是 subscribe

    你已经这样解决了:

    private void doSomething() {
        Mono.just("Foo").subscribe( ... );
    }
    
    doSomething();
    

    现在您正在订阅自己以使事情运行,这不是正确的方式,如前所述,调用客户端是订阅者,而不是您。

    正确方法:

    private Mono<String> doSomething() {
        return Mono.just("Foo");
    }
    
    // This is returned out to the calling client, they subscribe
    return doSomething();
    

    当 Mono/Flux 完成时,它会发出一个信号,这个信号将触发链中的下一个和下一个。

    所以我对你需要做的事情的看法如下:

    • 删除all subscribes,如果你想做一些事情,有flatmapmapdoOnSuccess等函数。保持链一直保持不变客户。
    • 删除所有 void 函数,确保它们返回 FluxMono,如果你想返回一些东西,使用返回 Mono&lt;Void&gt; Mono.empty() 函数,这样链就完成了。

    一旦您使用 Mono/Flux,您需要处理返回,以便其他人可以链接。

    更新:

    为了触发then,你必须返回一些东西,它会在之前的单声道/通量完成时返回。

    示例:

    private Flux<String> doSomething() {
        return Flux.just("Foo", "Bar", "FooBar")
                   .doOnNext(string -> {
                       return // return something
                   });
    }
    
    // Ignore what was return from doSomething and return something else when the flux has completed (so only trigger on the completed signal from the flux)
    return doSomething().then( ... );
    

    【讨论】:

    • 我理解你的remakrs,但问题仍然存在,我不想返回通量,我想完成通量执行然后返回一个单声道作为对客户端的响应,如何这可以在 spring webflux 中完成吗?
    • 阅读then(Mono&lt;V&gt; other) 的文档,它声明Let this Flux complete then play signals from a provided Mono,这意味着如果您在通量完成时返回Flux(无论您希望它做什么),它将返回您放入的内容你的then
    • you should not subscribe in your application,为什么不呢? void 方法也适用于 doOnNext,它应该只用于副作用。问题是他们预计副作用会影响原始 Mono 链。此外,doOnNext 的 AFAIK 没有方法签名允许您返回任何内容,因为它只需要一个消费者。
    • 因为调用客户端失去了控制背压的能力。而且我从未声称 doOnNext 没有采用 void 方法,所以请在发表评论之前阅读。
    • 您无法控制doOnNext 中的背压,因此不确定您的观点。你也说You are doing several things wrong ... you are having void methods,但是 OP 在 doOnNext 中使用它们,那么这又是怎么回事?我读错了什么?
    猜你喜欢
    • 1970-01-01
    • 2021-11-07
    • 1970-01-01
    • 2022-01-04
    • 1970-01-01
    • 1970-01-01
    • 2017-03-15
    • 1970-01-01
    • 2020-01-31
    相关资源
    最近更新 更多