【发布时间】:2015-08-11 03:48:59
【问题描述】:
我是 RxJava 新手,我正在努力弄清楚如何正确关闭资源,尤其是在处理多个订阅者时。
我有一个Observable<T>,其中T 是一些Closeable 资源(例如Android 数据库Cursor。
我可能在 observable 上有多个订阅者。在每个订阅者完成处理资源后,我想close() 资源。换句话说,在新资源交付/发射后关闭旧资源,最后一个订阅者退订时也关闭最后一个。
我尝试使用我称为AutoCloseOperator 的自定义运算符使其工作,它几乎可以工作,但不太正确。 IE。我仍然处于竞争状态和/或泄漏,例如资源没有关闭。
在 RxJava 中执行此操作的正确方法是什么?
假设我有这个代码:
final AutoCloseOperator<MyResource> autoClose = new AutoCloseOperator<MyResource>();
Subject<MyResource, MyResource> subject = PublishSubject.create();
Observable<MyResource> o = subject.lift(autoClose);
Subscription s1 = o.subscribe(new Action1<MyResource>() {
public void call(MyResource myObj) {
System.out.println("s1 handling " + myObj);
}
});
subject.onNext(new MyResource(1));
subject.onNext(new MyResource(2)); // This should close Resource #1 after Resource #2 is delivered
Subscription s2 = o.subscribe(new Action1<MyResource>() {
public void call(MyResource myObj) {
System.out.println("s2 handling " + myObj);
}
});
subject.onNext(new MyResource(3));
subject.onNext(new MyResource(4));
s1.unsubscribe();
subject.onNext(new MyResource(5));
subject.onNext(new MyResource(6));
s2.unsubscribe();
subject.onNext(new MyResource(7));
subject.onNext(new MyResource(8));
那么我期望以下行为:
s1 handling Resource #1
s1 handling Resource #2
Closing Resource #1
s1 handling Resource #3
Closing Resource #2
s2 handling Resource #3
s1 handling Resource #4
s2 handling Resource #4
Closing Resource #3
s2 handling Resource #5
Closing Resource #4
s2 handling Resource #6
Closing Resource #5
Closing Resource #6
Closing Resource #7
Closing Resource #8
注意:我在我的真实代码中没有使用PublishSubject,我只是在这里使用它来进行说明,我使用Observable.create,它会在每次更新数据库表时发出Cursor...
概括问题:我可以只使用doOnNext 和doOnUnsubscribe 来关闭旧项目,但这并没有考虑到这些事件会发生多次(对于每个订阅者),我只想要当所有订阅者都收到新项目时关闭资源。
使用lift() 的自定义运算符是否可行,或者是否有一些现有的运算符可能对此有所帮助?
我已将我的问题简化为一个小型命令行应用程序here on GitHub。感谢收看!
【问题讨论】:
标签: android rx-java rx-android