【问题标题】:Reactor / WebFlux implement a reactive http news tickerReactor / WebFlux 实现一个响应式 http 新闻自动收报机
【发布时间】:2018-09-15 14:12:04
【问题描述】:

我有一个很容易制定的请求,但我无法在不泄漏资源的情况下完成它。

我想返回 application/stream+json 类型的响应,其中包含某人发布的新闻事件。我不想使用 Websockets,不是因为我不喜欢它们,我只是想知道如何使用流。

为此,我需要从我的 restcontroller 返回一个 Flux<News>,一旦有人发布任何消息,它就会不断地收到消息。

我的尝试是创建一个发布者:

public class UpdatePublisher<T> implements Publisher<T> {

    private List<Subscriber<? super T>> subscribers = new ArrayList<>();

    @Override
    public void subscribe(Subscriber<? super T> s) {
        subscribers.add(s);
    }

    public void pushUpdate(T message) {
        subscribers.forEach(s -> s.onNext(message));
    }

}

还有一个简单的新闻对象:

public class News {
    String message;
    // Constructor, getters, some properties omitted for readability...
}

和发布新闻的端点分别获取新闻流

// ...

private UpdatePublisher<String> updatePublisher = new UpdatePublisher<>();

@GetMapping(value = "/news/ticker", produces = "application/stream+json")
public Flux<News> getUpdateStream() {
     return Flux.from(updatePublisher).map(News::new);
}

@PutMapping("/news")
public void putNews(@RequestBody News news) {
    updatePublisher.pushUpdate(news.getMessage());
}

这可行,但我无法取消订阅或再次访问任何给定订阅 - 因此一旦客户端断开连接,updatePublisher 将继续推送越来越多的死频道 - 因为我无法调用 @订阅上的 987654327@ 处理程序。

TL;DL:

是否可以将消息从另一个线程推送到可能无穷无尽的 Flux 上,并且仍然按需终止 Flux,而不依赖于对等异常或类似的东西的重置?

【问题讨论】:

    标签: java spring project-reactor


    【解决方案1】:

    您应该永远不要尝试自己实现Publisher 接口,因为它归结为正确实现反应式流。这正是您在这里面临的问题。

    您应该使用 Reactor 本身提供的生成器运算符之一(这实际上是一个 Reactor 问题,与 Spring WebFlux 无关)。

    在这种情况下,Flux.createFlux.push 可能是最佳候选者,因为您的代码使用某种类型的事件侦听器将事件推送到流中。 See the reactor project reference documentation on that.

    如果没有更多细节,很难为您提供解决问题的具体代码示例。不过,这里有一些提示:

    • 如果您想要一些类似多播的通信模式,您可能希望 .share() 为所有订阅者提供事件流
    • 注意你想要的推/拉/推+拉模型;背压应该如何在这里工作?如果我们生成更多订阅者可以处理的事件怎么办?
    • 此模型仅适用于单个应用程序实例。如果您希望这适用于多个应用程序实例,您可能需要使用代理研究消息传递模式

    【讨论】:

    • 我只是不知道如何以某种方式表明通量已完成(即用户关闭浏览器窗口)
    • 当用户关闭他们的浏览器窗口时,流量并没有被服务器“完成”,但它实际上被客户端取消了。这是由响应式流自动处理的。
    • 是的,但如果我使用 ie。 Flux.create 并注册一个侦听器,然后将消息推送到 Fluxsink - 它负责底层侦听器基础设施以不再推送消息 - 没有可以取消注册侦听器的组件
    • FluxSink API 提供您所需的一切。
    • 非常感谢您对此的帮助,最后的问题是,将 c-type 设置为“text/event-stream”而不是“application/stream+json”——至于我为什么去调查一下。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-04-20
    • 1970-01-01
    • 1970-01-01
    • 2019-07-15
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多