【问题标题】:How can I create a BehaviorSubject that subscribes to an observable?如何创建订阅 observable 的 BehaviorSubject?
【发布时间】:2017-07-06 05:37:47
【问题描述】:

我有一个函数应该返回一个BehaviorSubject。该主题旨在返回Profile的最新版本

(user)Profile 只是一个包含对三个成员的引用的 POJO:
- User,
- 该用户的MeasurementList,
- 和Deadline

其中两个属性是通过改造调用获得的,其中一个已经保存在类变量中。

每当 observable 发出一个新的 measurement listdeadline 时,BehaviorSubject 应该发出一个新的更新的 Profile。

这是一个(希望有帮助的)图表,说明应该发生的事情

这是我目前所拥有的

 public BehaviorSubject<Profile> observeProfile() {

        if (profileBS == null) {

            profileBS = BehaviorSubject.create();

            Observable o = Observable.combineLatest(
                    Observable.just(userAccount),
                    observeMeasurements(),
                    observeDeadline(),
                    Profile::new
            );


            profileBS.subscribeTo(o); //subscribeTo does not exist, but this is what I am trying to figure out how to do.

        }
        return profileBS;
    }

谁能帮我正确地创建这个 BehaviorSubject 吗?

谢谢。

【问题讨论】:

    标签: java android retrofit rx-java reactivex


    【解决方案1】:

    Subject 实现了Observer 接口,因此您可以执行以下操作

    public BehaviorSubject<Profile> observeProfile() {
    
            if (profileBS == null) {
    
                profileBS = BehaviorSubject.create();
    
                Observable o = Observable.combineLatest(
                        Observable.just(userAccount),
                        observeMeasurements(),
                        observeDeadline(),
                        Profile::new
                );
    
                // Note the following returns a subscription. If you want to prevent leaks you need some way of unsubscribing.
                o.subscribe(profileBS); 
    
            }
            return profileBS;
    }
    

    请注意,您应该想出一种处理结果订阅的方法。

    【讨论】:

    • 谢谢。答案很简单,但最初对我来说似乎有点违反直觉。
    • @Stephen 这个答案会起作用,但有一些条件。正如我在回答中所说,您过早订阅上游。所以你的下游会失去它的事件。
    • @PhoenixWang 我现在正在阅读您的答案。我会看看。实际上我现在必须去上班,但稍后会跟进。感谢您的提醒。
    • 请注意,在 RxJava2 中,subscribe(Observer) 不会返回订阅 (Disposable)。 “与 1.x 版的 Observable 不同,subscribe(Observer) 不允许外部取消订阅,而 Observer 实例预计会公开此类功能。”
    【解决方案2】:

    我想我和你面临同样的问题。在我回答之前,我想讲一些历史。 所以我想很多人都看到了 JakeWharton 在 Devoxx 的演讲:The State of Managing State with RxJava

    所以他想出了一个基于一些变形金刚的架构。但问题是,即使是那些 Transformer 实例也能在您的 ViewObservables 之外生存。每次您的 ViewObservable 使用它们时,它们仍然会创建新的 Observable。是因为算子的概念。

    因此,一个常见的解决方案是使用主题作为网关,就像您在问题中所做的那样。但是新的问题是你订阅源太早了。

    订阅操作应该在subscribeActual() 中完成,这将由您的下游的subscribe() 方法触发。但是你在中间订阅了你的上游。你失去了你的事件。 我为解决这个问题而苦苦挣扎,但始终没有找到解决方案。

    但最近感谢 Davik 的回答:RxJava - .doAfterSubscribe()? 我想出了这样的事情:

    public BehaviorSubject<Profile> observeProfile() {
        public Observable resultObservable;
        if (profileBS == null) {
    
            profileBS = BehaviorSubject.create();
            //this source observable should save as a field as well 
            o = Observable.combineLatest(
                    Observable.just(userAccount),
                    observeMeasurements(),
                    observeDeadline(),
                    Profile::new
            );
            //return this observable instead of your subject
            resultObservable = profileBS.mergeWith(
                Observable.empty()
                          .doOnComplete(() -> {
                              o.subscribe(profileBS);
                          }))
        } return resultObservable;
    }
    

    诀窍就是这样。您使用这种方法来制作像doAfterSubscribe 这样的运算符。所以只有下游已经订阅了你的主题。您的主题将订阅您的原始上游来源。

    希望这会有所帮助。对不起,我的英语不好。

    【讨论】:

    • susbcribeActual 我认为只有 rxJava2。
    • @JohnWowUs 是的,但概念是一样的。 Obseravble 应该考虑通过触发subscribe() 方法来延迟初始化。但是在此方法中,Subject 订阅原始源,因此无论您的“主要”订阅是否订阅您的主题,它都会从源接收事件。这就是你输掉赛事的原因。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-12-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-09-05
    • 1970-01-01
    相关资源
    最近更新 更多