【问题标题】:Sharing Observable between Activities with RxJava and unsubscribing使用 RxJava 在活动之间共享 Observable 并取消订阅
【发布时间】:2017-02-07 08:43:38
【问题描述】:

我正在尝试实现一个计时器Observable,它在我的应用程序的Activities 之间共享。我正在一个作为 Dagger 单例的类上进行实现,我在每个不同的Activity 的每个Presenter 中注入它。

我用这种方式创建了一次 Observable:

Observable.defer(() -> Observable.timer(milliseconds, TimeUnit.MILLISECONDS).map(t -> this::doSomethingCool()))
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .share();

我使用以下功能从 Presenter 进行订阅:

public Observable<Status> register(Callback callback) {

    PublishSubject<Status> subject = PublishSubject.create();
    subject.subscribe(status -> {},
            throwable -> L.LOGE(TAG, throwable.getMessage()),
            () -> callback.onStatusChanged(mBasketStatus));

    mObservable.subscribe(subject);
    basketCounterCallback.onStatusChanged(status));

    subject.doOnUnsubscribe(() -> L.LOGD(TAG, "Unsubcribed from subject!"));
    return subject.asObservable();
}

我将 Subject 存储为 Observable 在每个演示者中,我打电话给: obs.unsubscribeOn(AndroidSchedulers.mainThread()) 取消订阅(在onPause() 方法中)。我还尝试使用调度程序取消订阅Schedulers.immediate()

但是无论如何都会调用回调 X 次(其中 X 是我已订阅计时器的所有 Presenter),因此它不会取消订阅。日志"Unsubcribed from subject!" 也没有被调用。

如何正确退订每个主题?

提前致谢

编辑:

由于 cmets 添加了更多实现细节:

这是我创建 Observable 并将其存储在 SingletonStatusManager 的成员中的部分(状态也是单例):

private Observable<BasketStatus> mObservable;
private Status mStatus;

public Observable<BasketStatus> start(long milliseconds, Status status, Callback callback) {

    if (mObservable == null) mObservable = createObservable(milliseconds, status);

    return register(callback);
}

private Observable<BasketStatus> createObservable(long milliseconds, Status status) {

    mStatus = status;

    return Observable.defer(() -> Observable.timer(milliseconds, TimeUnit.MILLISECONDS).map(t -> status.upgradeStatus()))
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .share();
}

public Observable<BasketStatus> register(Callback callback) {

    PublishSubject<Status> subject = PublishSubject.create();
    subject.subscribe(status -> {},
            throwable -> L.LOGE(TAG, throwable.getMessage()),
            () -> callback.onStatusChanged(mStatus));

    mObservable.subscribe(subject);
    callback.onStatusChanged(mStatus));

    subject.doOnUnsubscribe(() -> L.LOGD(TAG, "Unsubcribed from subject!"));
    return subject.asObservable();
}

在从启动计时器的Presenter 调用方法start(...) 之后,我从下一个演示者调用register(...) 方法:

class Presenter implements Callback {

    private Observable<BasketStatus> mRegister;

    @Inject
    public Presenter(Status status, StatusManager statusManager) {
        mRegister = statusManager.start(20000, status, this);
    }

    // Method called from onPause()
    public void unregisterFromBasketStatus() {
         mRegister.unsubscribeOn(Schedulers.immediate());
    }
}

还有下一位演讲者……

@Inject
public NextPresenter(StatusManager statusManager) {
    mBasketStatusManager.register(this);
}

【问题讨论】:

  • 让我们退后一步。你想达到什么目标?
  • 我想知道定时器什么时候结束,并且只通知那一刻显示的活动,并且只有当应用程序打开时才通知。所以我认为服务不是解决方案,所以我想到了这个解决方案。
  • mObservable 计时器吗?
  • 您的代码示例还不够。您退订了哪些 observable?由register 方法返回的那个还是存储在mObservable 中的可观察计时器?另外,你真的在​​ onPause 方法中调用.unsubscribe() 吗?在我看来,您似乎不明白unsubscribeOn() 做了什么......它只是指示在特定调度程序上运行取消订阅任务。它不会以任何方式调用它..我可能对您的理解有误,这就是我们需要更多代码的原因..
  • @Blackbelt 是的!

标签: android rx-java observable rx-android subject-observer


【解决方案1】:

正如我在评论中提到的,您没有得到 unsubscribeOn 运算符的行为。它不会取消订阅任何内容,而是告诉每个订阅者,当取消订阅发生时,它应该在哪里工作。这也没有意义,因为您不是从 Observable 退订,而是从代表观察者与流本身之间的连接的 Subscription 退订。

回到你的问题。您的代码现在设计的方式,您返回的主题完全没有用。您正在订阅注册方法中可观察的计时器,但是您将这些通知转发给之后任何人都不会订阅的主题。如果你真的需要它们,例如在我们看不到的代码的某些部分,你需要存储subscribe() 方法的结果,它是一个Subscription 对象,并在需要时调用unsubscribe() (我猜是unregisterFromBasketStatus)。

但代码中还有更多问题。例如:

subject.doOnUnsubscribe(() -> L.LOGD(TAG, "Unsubcribed from subject!"));
return subject.asObservable(); 

在第一行中,您不会将该操作的结果存储在任何地方。由于 Observables 是不可变的结构,每个操作符都会创建一个新的流来接收来自前一个的通知。由此,很明显,当您在第二行返回 subject.asObservable() 时,您不是从第一行返回已修改的 observable,而是返回未修改的旧的 observable。要纠正这个问题,只需将 subject 变量替换为结果:subject = subject.doOnUnsubscribe(() -&gt; L.LOGD(TAG, "Unsubcribed from subject!"));

其次,使用流的原因之一是完全替换回调。当然,它会起作用,但你正在减轻 Rx 为代码库带来的许多好处。然而,它的可读性要差得多,任何在你之后必须处理你的代码的人都会诅咒你到地狱,以至于他可能真的成功:-)) 我知道,Rx 从一开始就很难学习,但是试试尽量避免这种情况。

【讨论】:

  • 谢谢,这是一个很好的答案!但我有一个问题,我使用Subject 是为了在每次Presenter 订阅(register())可观察对象时不会重新启动计时器。如果我使用订阅,则每次订阅都会重新启动计时器,我不想要这种行为。现在我更好地理解了这些主题(谢谢:)),我可以订阅演示者的Subject 吗?除此之外,我正在使用 Callback 来将 RxJava 代码与 Presenters 解耦,你不认为这是一个好方法吗?
猜你喜欢
  • 2019-03-19
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多