【发布时间】:2015-03-10 22:16:10
【问题描述】:
我很难使用 Spring Reactor Stream API(类似于 rxjava)在我的服务中构造一个响应对象,该对象包装由两个下游服务提供的响应。
以下是我的 Channel Consumer 上的 accept() 方法。为了保护无辜者,有些名字已经改了..
因此,FooRequest 包装了许多 BarRequests,每个 BarRequests 都有一个关联的分类请求和一个关联的验证请求。我们想要 1) 转换为 FooRequest,2) 将 FooRequest 转换为一系列 BarRequests,3) 为每个 BarRequest 运行两个下游请求,4) 将我们所有的 BarResponse 对象聚合到整体响应,5) 将响应发送回客户端。
我遇到问题的地方是toList() 方法,它似乎永远不会执行。每次我尝试涉及Promise 的事情时,它似乎总是会崩溃,这也不例外。
FooRequestFunction、BarRequestStreamFunction 相当简单,而且似乎运行良好。它们的方法签名是:
// FooRequestFunction
public FooRequest apply(final FullHttpRequest request);
还有:
// BarRequestStreamFunction
public Stream<BarRequest> apply(FooRequest dsoRequests);
DownstreamRequestZipFunction 看起来像这样:
这似乎工作正常,只要两个下游请求函数都返回结果。
最后,链式调用结束时的 Consumer 有这个签名:
// ResponsesConsumer
public void accept(Promise<List<BarResponse>> responses)
它所做的是 await() 响应承诺,然后将所有这些响应聚合到单个 XML 文档中写回通道。我可以说执行永远不会达到这种方法,因为没有任何日志记录会触发。这一切似乎都停止在 .toList() 处。
有谁知道为什么这个设置似乎会执行toList() 或之后的任何内容?
编辑:好的,我有更多信息。在为应用程序中的每个线程提供命名约定以使调试更容易之后,我可以看到“shared-1”,运行 accept() 方法的线程进入 WAITING 状态,然后停留在那里。这可能与底层 Dispatcher 是一个单线程的环形缓冲区调度程序这一事实有关。
我修改了代码,使方法略有不同,并使用了多线程调度程序,并避免使用Promise,但我仍然有一个状态,即链式调用集的尾部不会执行。见下文:
在上面,我已将 toList() 替换为对 reduce() 的调用,并将所有内容折叠成一个 List<BarResponse>。我可以看到这个执行和记录很好。但是,无论我对最后一次调用做什么,在尝试了 consume()、consumeOn() 等之后 - 它永远不会执行,也永远不会记录您在上面看到的最终调用。
查看 VisualVM,我可以看到调度程序线程都在与阻塞队列关联的同一个对象监视器上等待 - 换句话说,它们都在等待工作到达。就像 tail consumeOn() 调用被完全忽略一样。
我在这里做错了什么?我不明白什么?
编辑 2:鉴于 Johns 在下面的回复,我怀疑问题出在服务器设置上。可能仅适用于 reactor 版本 2.0.0.M2,在 Application 主类中配置如下:
没有为此配置调度程序,它似乎在后台使用 LMAX 破坏程序,而不是 NettyEventLoopDispatcher。目前尚不清楚如何设置NettyEventLoopDispatcher并将其用作替代调度程序。
【问题讨论】:
-
调度员能力够吗?
-
是的。我正在使用一个 Http 请求对其进行测试。它一到达 toList() 就会挂起
-
只是对上述内容的快速评论 - 这看起来像是 Netty 的某种问题。我们尝试了一个使用 RxJava/RxNetty 的等效程序,并且遇到了非常相似的问题。除此之外,我们无法了解更多信息,因为我们的任务预算已用完。
标签: java spring rx-java project-reactor