【问题标题】:RxJava: closing a resource after every subscriber handled itRxJava:在每个订阅者处理资源后关闭资源
【发布时间】:2015-08-11 03:48:59
【问题描述】:

我是 RxJava 新手,我正在努力弄清楚如何正确关闭资源,尤其是在处理多个订阅者时。

我有一个Observable<T>,其中T 是一些Closeable 资源(例如Android 数据库Cursor

我可能在 observable 上有多个订阅者。在每个订阅者完成处理资源后,我想close() 资源。换句话说,在新资源交付/发射后关闭旧资源,最后一个订阅者退订时也关闭最后一个。

我尝试使用我称为AutoCloseOperator 的自定义运算符使其工作,它几乎可以工作,但不太正确。 IE。我仍然处于竞争状态和/或泄漏,例如资源没有关闭。

在 RxJava 中执行此操作的正确方法是什么?

假设我有这个代码:

final AutoCloseOperator<MyResource> autoClose = new AutoCloseOperator<MyResource>();
Subject<MyResource, MyResource> subject = PublishSubject.create();
Observable<MyResource> o = subject.lift(autoClose);

Subscription s1 = o.subscribe(new Action1<MyResource>() {
    public void call(MyResource myObj) {
        System.out.println("s1 handling " + myObj);
    }
});

subject.onNext(new MyResource(1));
subject.onNext(new MyResource(2)); // This should close Resource #1 after Resource #2 is delivered

Subscription s2 = o.subscribe(new Action1<MyResource>() {
    public void call(MyResource myObj) {
        System.out.println("s2 handling " + myObj);
    }
});

subject.onNext(new MyResource(3));
subject.onNext(new MyResource(4));

s1.unsubscribe();

subject.onNext(new MyResource(5));
subject.onNext(new MyResource(6));

s2.unsubscribe();

subject.onNext(new MyResource(7));
subject.onNext(new MyResource(8));

那么我期望以下行为:

s1 handling Resource #1
s1 handling Resource #2
Closing Resource #1
s1 handling Resource #3
Closing Resource #2
s2 handling Resource #3
s1 handling Resource #4
s2 handling Resource #4
Closing Resource #3
s2 handling Resource #5
Closing Resource #4
s2 handling Resource #6
Closing Resource #5
Closing Resource #6
Closing Resource #7
Closing Resource #8

注意:我在我的真实代码中没有使用PublishSubject,我只是在这里使用它来进行说明,我使用Observable.create,它会在每次更新数据库表时发出Cursor...

概括问题:我可以只使用doOnNextdoOnUnsubscribe 来关闭旧项目,但这并没有考虑到这些事件会发生多次(对于每个订阅者),我只想要当所有订阅者都收到新项目时关闭资源。

使用lift() 的自定义运算符是否可行,或者是否有一些现有的运算符可能对此有所帮助?

我已将我的问题简化为一个小型命令行应用程序here on GitHub。感谢收看!

【问题讨论】:

    标签: android rx-java rx-android


    【解决方案1】:

    Observable.using() 是您所需要的。

    如果你有 t 类型为 T.close() 方法,并且你想从 t(你的光标)中提取一些东西,说 Observable&lt;R&gt;,那么这里是如何做到的:

    Func0<T> resourceFactory = () -> t;
    Func1<T, Observable<R>> observableFactory = x -> ...
    Action1<T> disposeAction = x -> x.close();
    
    Observable<R> results = Observable.using(resourceFactory, observableFactory, disposeAction);
    

    你提到你有Observable&lt;T&gt;。要从所有 T 中获取所有 R,请使用上面的代码,如下所示:

    Observable<T> source = ...
    Observable<R> results = 
        source.flatMap(t -> {
            Func0<T> resourceFactory = () -> t;
            Func1<T, Observable<R>> observableFactory = x -> ...
            Action1<T> disposeAction = x -> x.close();
            return Observable.using(resourceFactory, observableFactory, disposeAction);});
    

    【讨论】:

    • 感谢您的回答!我已经尝试过Observable.using(),但我很难在没有过早关闭资源的情况下解决我的问题。当我有更多时间时,我会再试一次并提供示例代码反馈!谢谢!
    • 不用担心。你不能让你的资源流到 observableFactory 之外,否则你会遇到过早关闭的问题。从资源中提取您需要的内容并将其映射到与资源解耦的内容。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-10-02
    • 2022-09-29
    • 2022-07-12
    • 2019-05-16
    • 1970-01-01
    相关资源
    最近更新 更多