【问题标题】:RxJava changing thread after concat mapRxJava 在 concat 映射后更改线程
【发布时间】:2016-07-06 15:57:11
【问题描述】:

你好 RxJava 大师,

在我当前的 Android 项目中,我在使用 RxJava 和 SQLite 时遇到了一些死锁问题。我的问题是:

  1. 我在线程上启动事务
  2. 调用 Web 服务并在数据库中保存一些内容
  3. concat 映射另一个可观察函数
  4. 尝试在数据库上写其他东西 ---> 遇到死锁

这是我的代码:

//define a scheduler for managing transaction in the same thread
private Scheduler mScheduler = Schedulers.from(Executors.newSingleThreadExecutor());


Observable.just(null)
            /* Go to known thread to open db transaction */
            .observeOn(mScheduler)
            .doOnNext(o -> myStore.startTransaction())
            /* Do some treatments that change thread */
            .someWebServiceCallWithRetrofit()
            /* Return to known thread to save items in db */
            .observeOn(mScheduler)
            .flatMap(items -> saveItems(items))
            .subscribe();

public Observable<Node> saveItems(List<Item> items) {
    Observable.from(items)
            .doOnNext(item -> myStore.saveItem(item)) //write into the database OK
            .concatMap(tab -> saveSubItems(item));
}

public Observable<Node> saveSubItems(Item item) {
    return Observable.from(item.getSubItems())
            .doOnNext(subItem -> myStore.saveSubItems(subItem)) //DEADLOCK thread is different
}

为什么突然间 RxJava 正在改变线程?即使我指定了我希望他在我自己的调度程序上进行观察。我通过在 saveSubItem 之前添加另一个 observeOn 进行了肮脏的修复,但这可能不是正确的解决方案。

我知道,当您通过改造调用 Web 服务时,响应会转发到一个新线程(这就是为什么我创建了自己的调度程序以返回我开始我的 sql 事务的线程)。但是,我真的不明白 RxJava 是如何管理线程的。

非常感谢您的帮助。

【问题讨论】:

    标签: java android multithreading transactions rx-java


    【解决方案1】:

    副作用运算符(如flatMap 一样)在调用它的任何线程上同步执行。尝试类似

    Observable.just(null)            
              .doOnNext(o -> myStore.startTransaction())
              .subscribeOn(mScheduler)      // Go to known thread to open db transaction 
                /* Do some treatments that change thread */
              .someWebServiceCallWithRetrofit()                      
              .flatMap(items -> saveItems(items))
              .subscribeOn(mScheduler) // Return to known thread to save items in db
              .observeOn(mScheduler) // Irrelevant since we don't observe anything
              .subscribe();
    

    【讨论】:

      【解决方案2】:

      据我所知,doOnNext 方法在与之前的代码不同的线程中调用,因为它与序列的其余部分异步运行。

      示例:您可以进行多次休息调用,将其保存到数据库并在doOnNext(...) 内部通知视图/演示者/控制器进度。您可以在保存到数据库之前或/和保存到数据库之后执行此操作。 我建议您“平面映射”代码。

      所以saveItems 方法看起来像这样(如果myStore.saveSubItems 返回结果):

      public Observable<Node> saveSubItems(Item item) {
      return Observable.from(item.getSubItems())
              .flatMap(subItem -> myStore.saveSubItems(subItem))
      }
      

      使用“flatMapping”保证操作在与前一个序列相同的线程上运行并且序列继续然后flaMap函数结束。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2021-06-29
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2013-03-26
        相关资源
        最近更新 更多