【问题标题】:Flux endpoint from infinite java stream来自无限 Java 流的通量端点
【发布时间】:2019-06-11 21:20:41
【问题描述】:

我在处理由 Stream.generate 构造构建的通量时遇到问题。

Java 流正在从远程源获取一些数据,因此我实现了一个自定义供应商,其中嵌入了数据获取逻辑,然后使用它来填充流。

Stream.generate(new SearchSupplier(...))

我的想法是检测空列表并使用takeWhile的Java9特性->

Stream.generate(new SearchSupplier(this, queryBody))
            .takeWhile(either -> either.isRight() && either.get().nonEmpty())

(使用 Vavr 的 Either 构造)

repositoroy 层 Flux 将执行以下操作:

return Flux.fromStream (
            this.searchStream(...) //this is where the stream gets generated
        )
        .map(Either::get)
        .flatMap(Flux::fromIterable);

“服务”层由通量上的一些转换步骤组成,但方法签名类似于Flux<JsonObject> search(...)

最后,控制器层有一个GetMapping:

@GetMapping(produces = "application/stream+json")
public Flux search(...) {
    return searchService.search(...) //this is the Flux<JsonObject> parth
         .subscriberContext(...) //stuff I need available during processing
         .doOnComplete(() -> log.debug("DONE")); 
}

我的问题是 Flux 似乎永远不会终止。 例如,从 Postman 打来的电话只是在响应部分中拍摄了“正在加载...”部分。当我从我的 IDE 中终止该过程时,结果会被刷新到邮递员,我看到了我所期待的。 doOnComplete lambda 也永远不会被调用

我注意到的是,如果我更改 Flux 的来源:

Flux.fromArray(...) //harcoded array of lists of jsons

doOnComplete lambda 被调用,http 连接也关闭,结果显示在邮递员中。

知道可能是什么问题吗?

谢谢。

【问题讨论】:

  • 我找到了一种指示通量完成的方法,方法是在控制器中设置超时并在订阅者上调用 onComplete: .timeout(ofSeconds(10), Subscriber::onComplete) .doOnComplete( () -> log.info("完成"));现在调用 doOnComplete lambda 并关闭 http 连接
  • 为什么不直接使用 Flux.generate 创建 Flux?您可以使用sink.complete 表示完成并在达到条件时终止 Flux。
  • 我试试;正在使用我拥有的旧实现中的 Stream.generate 东西

标签: java project-reactor spring2.x


【解决方案1】:

您可以使用如下所示的代码直接创建 Flux。请注意,我添加了一些假设的方法,您需要根据您的 SearchSupplier 的工作方式来实施这些方法:

Flux<SearchResultType> flux = Flux.generate(
            () -> new SearchSupplier(this, queryBody),
            (supplier, sink) -> {
                SearchResultType current = supplier.next();
                if (isNotLast(current)) {
                    sink.next(current);
                } else {
                    sink.complete();
                }
                return supplier;
            },
            supplier -> anyCleanupOperations(supplier)
        );

【讨论】:

  • 是的,通量生成方法确实有效。看到您的评论后,我尝试了它。仍然不确定为什么 java 流上的 takeWhile 没有终止。会接受答案,谢谢
  • 很遗憾,如果没有看到您的 SearchSupplier 实现的详细信息,我无法解释为什么流不会终止或将终止传播到 Flux。
  • 不用担心,如果允许的话会分享的。
猜你喜欢
  • 1970-01-01
  • 2017-09-28
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-11-11
  • 1970-01-01
  • 1970-01-01
  • 2010-09-18
相关资源
最近更新 更多