【问题标题】:Wait for previous Rx Observable to Finish等待之前的 Rx Observable 完成
【发布时间】:2016-06-26 03:37:48
【问题描述】:

您好,我正在使用 RxJava 进行磁盘存储获取和设置操作。基本上我有这样的方法:

public Observable<String> getStorageItem(String id, String type) {
    return Observable.defer(new Func0<Observable<String>>() {
        // Run db operations to get storage item.
    }
}

问题是这个方法getStorageItem(...) 有可能连续被订阅多次。并且 observable 中的数据库操作不能同时运行。我在这里最好的选择是什么?我应该手动创建某种排序吗?或者 RxJava 是否有某种工具可以让我阻止操作,直到前一个操作完成?

【问题讨论】:

  • 实际上我认为我可以在我的所有 Db 操作中使用同步锁来解决这个问题。傻我。但是很想听听其他人对此是否有任何意见。
  • 你可以看看flatMap(Func1, int)是否符合你的需求(reactivex.io/RxJava/javadoc/rx/…

标签: android rx-java


【解决方案1】:

您可以将subscribeOn 与从ExecutorService 创建的单线程调度程序一起使用,以确保只有一个数据库操作正在进行:

ExecutorService exec = Schedulers.newSingleThreadExecutor();
Scheduler s = Schedulers.from(exec);

public Observable<String> getStorageItem(String id, String type) {
    return Observable.fromCallable(() -> {
        // Do DB operations 
    });
}

getStorageItem("1", "2").subscribeOn(s).subscribe(...);
getStorageItem("2", "4").subscribeOn(s).subscribe(...);
getStorageItem("3", "6").subscribeOn(s).subscribe(...);

但请注意,通过将计算移出调用者的线程,它可以随时执行。如果你需要单独等待(因为getStorageItem已经在某个线程上被调用),你可以在subscribeOn之后申请toBlocking()

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-06-20
    • 1970-01-01
    • 1970-01-01
    • 2018-10-06
    • 2018-08-06
    • 1970-01-01
    • 2018-09-05
    • 2017-09-20
    相关资源
    最近更新 更多