【问题标题】:RxJava async task in AndroidAndroid中的RxJava异步任务
【发布时间】:2015-04-16 09:22:13
【问题描述】:

我正在尝试在 Android 中使用 RxJava 实现异步任务。 我尝试了以下代码,但没有成功。它在 UI 线程上执行。我正在使用以下版本的 RxAndroid 0.24.0。

try {
    Observable.just(someMethodWhichThrowsException())
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(s -> onMergeComplete());
}
catch (IOException e) {
    e.printStackTrace();
}

但是,以下内容对我来说是异步的。

Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            try {
                someMethodWhichThrowsException();
            } catch (IOException e) {
                e.printStackTrace();
            }

            subscriber.onCompleted();
        }
    });
    observable.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe();

我正在尝试理解以下内容:

  1. 它们有什么区别?
  2. 创建异步任务时的最佳做法是什么?

【问题讨论】:

    标签: android android-asynctask rx-java


    【解决方案1】:
    1. 它们之间有什么区别?
    Observable.just(someMethodWhichThrowsException())
        .subscribeOn(Schedulers.newThread())
    

    这等价于:

    Object someResult = someMethodWhichThrowsException();
    Observable.just(someResult)
        .subscribeOn(Schedulers.newThread())
    

    如您所见,这首先进行同步方法调用,然后将其传递给Observable.just 以成为 Observable。

    Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                ...
            }
        })
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe();
    

    但是,此方法将运行call 块中的代码订阅。你已经告诉它你想订阅一个新线程(subscribeOn(Schedulers.newThread())),所以订阅发生在一个新线程上,并且在订阅上运行的代码(call 块)也在那个线程上运行。这与调用Observable.defer 的行为类似。

    1. 创建异步任务时的最佳做法是什么?

    嗯,这取决于你和你想要的行为。有时您希望异步代码立即开始运行(在这种情况下,您可能希望使用其中一个操作符来缓存它)。我肯定会考虑为此使用Async Utils 库。

    其他时候,您会希望它仅在订阅时运行(这是此处示例中的行为) - 例如,如果有副作用,或者如果您不在乎它何时运行而只想使用内置插件从 UI 线程中获取一些东西。 Dan Lew mentions Observable.defer 非常方便在转换为 Rx 期间将旧代码从 UI 线程中取出。

    【讨论】:

    • 谢谢...这真的很有帮助。
    • 没问题!其他答案给了你一个解决方案,但并没有真正给出太多解释。我建议使用内置的 defer 而不是添加额外的库,尤其是对于 Android - 减小 APK 的大小:)
    • 谢谢,我没有意识到这一点。我认为这应该是推荐的方法。 :)
    • Async Utils 库似乎很旧,而且快一年没有更新了。
    • 这是一个小型库,如果它稳定,则无需更新它,尽管我理解这种情绪。他们目前是accepting pull requests,所以它绝对没有死!这个库所做的一切你都可以自己构建,但是当你开始使用 Rx 时,它特别有用。
    【解决方案2】:

    使用来自RxJava Async Utils 库的 Async.start()。这将调用您在另一个线程上提供的函数。

    例子:

    Observable<String> observable = Async.start(new Func0<String>() {
        @Override
        public String call() {
            try {
                return someMethodWhichThrowsException();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    });
    

    正如您所注意到的,已检查的异常必须包装到 RuntimeExceptions 中。

    另见https://github.com/ReactiveX/RxJava/wiki/Async-Operators#start

    【讨论】:

    • 谢谢...这真的很有帮助。如果您对我的原始代码的差异有任何指示,那就太好了?
    • 我可以回答。 Observable.just() 允许从现有结果创建一个 observable。如果您编写 Observable.just(myFunction()),则执行 myFunction(),然后将结果传递给 just 方法(就像在任何 Java 表达式中一样)。所以这根本不是异步的。另一方面,当你调用 Async.start() 时,你的计算方法还没有被调用。稍后将在另一个线程中调用它,此时 Rx 将调用 func.call()(其中 func 是提供给 Async.start() 的参数)。
    • Async.start 上的注释,它会立即开始工作,甚至在订阅之前,这有时正是你想要的。如果您想等到订阅,请使用Async.toAction(...).call()Observable.defer。在这种情况下,defer 将处理异常清理器,因为您可以 return Observable.error(new MyException()); 而不是将其扔在那里,尽管抛出的异常会产生相同的结果。
    • @lopar,RxJavaAsyncUtil 提供了override start signature,它还获得了一个Scheduler,它是“运行函数的调度程序”。我可以相信这个细节吗?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2023-04-01
    • 1970-01-01
    • 1970-01-01
    • 2012-10-26
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多