【发布时间】:2016-07-06 15:57:11
【问题描述】:
你好 RxJava 大师,
在我当前的 Android 项目中,我在使用 RxJava 和 SQLite 时遇到了一些死锁问题。我的问题是:
- 我在线程上启动事务
- 调用 Web 服务并在数据库中保存一些内容
- concat 映射另一个可观察函数
- 尝试在数据库上写其他东西 ---> 遇到死锁
这是我的代码:
//define a scheduler for managing transaction in the same thread
private Scheduler mScheduler = Schedulers.from(Executors.newSingleThreadExecutor());
Observable.just(null)
/* Go to known thread to open db transaction */
.observeOn(mScheduler)
.doOnNext(o -> myStore.startTransaction())
/* Do some treatments that change thread */
.someWebServiceCallWithRetrofit()
/* Return to known thread to save items in db */
.observeOn(mScheduler)
.flatMap(items -> saveItems(items))
.subscribe();
public Observable<Node> saveItems(List<Item> items) {
Observable.from(items)
.doOnNext(item -> myStore.saveItem(item)) //write into the database OK
.concatMap(tab -> saveSubItems(item));
}
public Observable<Node> saveSubItems(Item item) {
return Observable.from(item.getSubItems())
.doOnNext(subItem -> myStore.saveSubItems(subItem)) //DEADLOCK thread is different
}
为什么突然间 RxJava 正在改变线程?即使我指定了我希望他在我自己的调度程序上进行观察。我通过在 saveSubItem 之前添加另一个 observeOn 进行了肮脏的修复,但这可能不是正确的解决方案。
我知道,当您通过改造调用 Web 服务时,响应会转发到一个新线程(这就是为什么我创建了自己的调度程序以返回我开始我的 sql 事务的线程)。但是,我真的不明白 RxJava 是如何管理线程的。
非常感谢您的帮助。
【问题讨论】:
标签: java android multithreading transactions rx-java