【问题标题】:How to execute Completable on Subscribe/Unsubscribe如何在订阅/取消订阅时执行 Completable
【发布时间】:2016-10-17 21:25:03
【问题描述】:

有我的方法:

public void openWorkshift(WorkshiftSettings workshiftSettings, Subscriber<WorkshiftSettings> subscriber) {
    api.openWorkshift(workshiftSettings)
            .compose(RxOperatorsHelpers.additionalStacktrace())
            .doOnSubscribe(() -> actionsSystem.registerAction(...).await()) // <-
            .doOnUnsubscribe(() -> actionsSystem.unregisterAction(...).await()); // <-
            .subscribeOn(ioScheduler)
            .observeOn(uiScheduler)
            .doOnError(this::handleError)
            .subscribe(subscriber);
}

ActionsSystem.registerAction(...)/ActionsSystem.unregisterActions(...) 看起来像这样:

public Completable registerAction(OperatorAction action) {
    return Completable.fromAction(() -> actions.add(action));
}

public Completable unregisterAction(OperatorAction action) {
    return Completable.fromAction(() -> actions.remove(action));
}

如您所见,我使用.await() 在源Observable 的流中执行Completable。感觉像是错误的解决方案。 怎样才能做得更优雅?

【问题讨论】:

    标签: java rx-java rx-android


    【解决方案1】:

    由于您的Completables 执行微不足道的操作,您只需将他们的代码内联到doOnSubscribedoOnUnsubscribe

            .doOnSubscribe(() -> actions.add(action))
            .doOnUnsubscribe(() -> actions.remove(action))
    

    您可以通过从可完成的andThenObservable 序列的其余部分开始来避免doOnSubscribe

    actionsSyste.registerAction(...)
    .andThen(api.openWorkshift(workshiftSettings)
            .compose(RxOperatorsHelpers.additionalStacktrace())
            .doOnUnsubscribe(() -> actionsSyste.unregisterAction(...).await())
            .subscribeOn(ioScheduler)
            .observeOn(uiScheduler)
            .doOnError(this::handleError)
     )
     .subscribe(...)
    

    目前,当下游取消​​订阅时,没有办法执行Completable,当序列可能正常终止或出现异常时,也没有简单的方法来执行它。

    【讨论】:

    • 与将Observable 包装成andThen(observable) 相比,使用doOnSubscribe(...) 似乎更干净。在这种情况下,我也无法得到api.openWorkshift(...) 的结果。
    • 在“响应式架构”中不使用Observable/Completable/Single/whatever 进行琐碎操作是否正常?
    【解决方案2】:

    您可以使用Observable.defer。这个操作符延迟创建 observable 直到它被订阅:

    Observable observable = Observable.defer(() -> {
            actions.add(action);
            api.openWorkshift(workshiftSettings)
    }).compose(RxOperatorsHelpers.additionalStacktrace())
        .subscribeOn(ioScheduler)
        .observeOn(uiScheduler)
        .doOnError(this::handleError);
    

    然后使用Subscription。根据documentationSubscription.create()

    创建并返回一个在取消订阅时调用给定 Action0 的订阅。

    所以基本上你需要这样做:

    Subscription subscription = Subscriptions.create(new Action0() {
        @Override
        public void call() {
            actionsSyste.unregisterAction(...);
        }
    });    
    subscriber.add(subscription);
    observable.subscribe(subscriber);
    

    【讨论】:

    • 这是正确的解决方案,但它不如只调用Completable.await() :)
    猜你喜欢
    • 2022-01-19
    • 2012-03-14
    • 2015-08-24
    • 2019-12-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-04-18
    相关资源
    最近更新 更多