【发布时间】:2018-09-15 14:12:04
【问题描述】:
我有一个很容易制定的请求,但我无法在不泄漏资源的情况下完成它。
我想返回 application/stream+json 类型的响应,其中包含某人发布的新闻事件。我不想使用 Websockets,不是因为我不喜欢它们,我只是想知道如何使用流。
为此,我需要从我的 restcontroller 返回一个 Flux<News>,一旦有人发布任何消息,它就会不断地收到消息。
我的尝试是创建一个发布者:
public class UpdatePublisher<T> implements Publisher<T> {
private List<Subscriber<? super T>> subscribers = new ArrayList<>();
@Override
public void subscribe(Subscriber<? super T> s) {
subscribers.add(s);
}
public void pushUpdate(T message) {
subscribers.forEach(s -> s.onNext(message));
}
}
还有一个简单的新闻对象:
public class News {
String message;
// Constructor, getters, some properties omitted for readability...
}
和发布新闻的端点分别获取新闻流
// ...
private UpdatePublisher<String> updatePublisher = new UpdatePublisher<>();
@GetMapping(value = "/news/ticker", produces = "application/stream+json")
public Flux<News> getUpdateStream() {
return Flux.from(updatePublisher).map(News::new);
}
@PutMapping("/news")
public void putNews(@RequestBody News news) {
updatePublisher.pushUpdate(news.getMessage());
}
这可行,但我无法取消订阅或再次访问任何给定订阅 - 因此一旦客户端断开连接,updatePublisher 将继续推送越来越多的死频道 - 因为我无法调用 @订阅上的 987654327@ 处理程序。
TL;DL:
是否可以将消息从另一个线程推送到可能无穷无尽的 Flux 上,并且仍然按需终止 Flux,而不依赖于对等异常或类似的东西的重置?
【问题讨论】:
标签: java spring project-reactor