【问题标题】:Java Flux vs. Observable/BehaviorSubjectJava Flux vs. Observable/BehaviorSubject
【发布时间】:2020-06-19 17:33:19
【问题描述】:

我的问题是 Flux 是否有能力表现得像 Observable 或 BehaviorSubject。我想我明白了 Flux 的作用和方式的要点,但是我看到的每个教程都会创建一个静态内容的 Flux,即一些预先存在的数字数组,这些数组本质上是有限的。

但是,我希望我的 Flux 随着时间的推移成为一个未知值的流......就像一个 Observable 或 BehaviorSubject。有了这些,您可以创建一个类似 setNextValue(String value) 的方法,并将这些值发送给 Observable/BehaviorSubject 等的所有订阅者。

这可以通过 Flux 实现吗?还是说 Flux 必须首先由 Observable 类型的值流组成?

更新

我用下面的实现回答了我自己的问题。接受的答案可能会导致相同的路径,但稍微复杂。

【问题讨论】:

    标签: java spring rx-java project-reactor reactive-streams


    【解决方案1】:

    我看到的每个教程都会创建一个静态内容的 Flux,即一些预先存在的数字数组,它们本质上是有限的。

    您会看到这一点,因为大多数教程都关注如何操作和使用 Flux - 但这里的含义(您可以只使用带有静态、固定长度内容的 Flux)既令人遗憾,而且错误的。它比这更强大,并且几乎可以肯定将其与此类静态内容一起使用不是您在现实世界中看到的使用方式。

    基本上有 3 种不同的方式来实例化 Flux 以按照您的描述动态发出元素:

    但是,我希望我的 Flux 随着时间的推移成为一个未知值的流......就像一个 Observable 或 BehaviorSubject。有了这些,您可以创建一个类似 setNextValue(String value) 的方法,并将这些值发送给 Observable/BehaviorSubject 等的所有订阅者。

    当然 - 看看Flux.push()。这会暴露一个发射器,并且可以随时调用emitter.next(value)。这个流可以持续只要你想要它(无限,如果需要)。Flux.create()本质上是Flux.push()的多线程变体,它也可能有用。

    Flux.generate() 可能也值得一看——这有点像Flux.push() 的“按需”版本,您只在下游消费者请求时通过回调发出下一个元素,而不是发出随时随地。这并不总是实用的,但如果用例可行,则使用此方法是有意义的,因为它尊重背压,因此可以保证不会用超出其处理能力的请求压倒消费者。

    【讨论】:

    • 谢谢,我最近看到了这些,但试图找到一个如何使用它们的好例子。本质上,我正在尝试创建一个 StatusService,它公开最新状态的 Flux (将通过接受 String 并将该值发送到 Flux 的方法进行更新,该 Flux 由任何其他组件/类订阅)。所以我会尝试用这些方法来实现。
    【解决方案2】:

    可以这样实现:

    private EmitterProcessor<String> processor;
    private FluxSink<String> statusSink;
    private Flux<String> status;
    
    public constructor() {
        this.processor = EmitterProcessor.create();
        this.statusSink = this.processor.sink(FluxSink.OverflowStrategy.BUFFER);
        this.status = this.processor.publish().autoConnect();
    }
    
    public Flux<String> getStatus() {
        return this.status;
    }
    
    public void setStatus(String status) {
        this.statusSink.next(status);
    }
    

    【讨论】:

    • 我测试了您的解决方案,但不幸的是它使用了已弃用的 API。
    猜你喜欢
    • 2017-01-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-07-28
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多