【发布时间】:2018-09-22 21:58:36
【问题描述】:
我一直在阅读 Reactor 文档,但我无法为以下问题找到合适的模式。 我有一个应该异步执行某些操作的方法。我以 Flux 的形式返回结果响应,消费者可以订阅它。
该方法有如下定义:
Flux<ResultMessage> sendRequest(RequestMessage message);
返回的通量是热通量,结果可以在任何给定时间异步出现。
潜在消费者可以通过以下方式使用它:
sendRequest(message).subscribe(response->doSomethinWithResponse(response);
实现可以是这样的:
Flux<ResultMessage> sendRequest(RequestMessage message) {
Flux<ResultMessage> result = incomingMessageStream
.filter( resultMessage -> Objects.equals( resultMessage.getId(), message.getId() ) )
.take( 2 );
// The message sending is done here...
return result;
}
incomingMessageStream 是通过此通道的所有消息的Flux。
这个实现的问题是消费者在结果消息到来之后才被订阅,它可能会错过其中的一些。
所以,我正在寻找一种解决方案,让消费者不依赖订阅时间。可能根本不需要潜在消费者订阅生成的Flux。我正在寻找一个通用的解决方案,但如果不可能,您可以假设结果消息的数量不大于 2。
【问题讨论】:
标签: java reactive-programming project-reactor