【发布时间】:2019-07-10 20:01:06
【问题描述】:
我对 RxJava2 有一个愚蠢的问题。
我需要同时运行两个长时间的操作。我知道我应该使用 Observable.zip() 并且我使用它。
问题是我的长操作是一个接一个地运行,另一个问题是我的长操作在我订阅它们之前就开始了。
假设这是我应该异步运行的长时间操作。
private String doSomethingLong() {
Random rand = new Random();
int value = rand.nextInt(5);
Timber.i("Do something for [%d] sec [%s]", value, Thread.currentThread().getName());
try {
Thread.sleep(value * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
return String.format(Locale.getDefault(), "Exception [%s]", e.getMessage());
}
return String.format(Locale.getDefault(),"Job for [%d] seconds", value);
}
还有一个像 test() 这样的方法会尝试使其并行:
public void test() {
final long started = System.currentTimeMillis();
Observable<String> just1 = Observable.just(doSomethingLong()).subscribeOn(Schedulers.newThread());
Observable<String> just2 = Observable.just(doSomethingLong()).subscribeOn(Schedulers.newThread());
Observable.zip(just1, just2, new Func2<String, String, Combined>() {
@Override
public Combined call(String s, String s2) {
return new Combined(s, s2);
}
}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Combined>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Combined combined) {
long total = System.currentTimeMillis() - started;
Timber.i("TOTAL [%d]ms [%s]", total, combined.toString());
}
});
}
当我尝试运行它时,我观察到两个 observables just1 和 just2 一个接一个地运行......这让我很困惑......
但是还有另一个员工让我更加困惑...我评论了 Observable.zip 并注意到 just1 和 just2 在我订阅它们之前启动了方法 doSomethingLong()...
让我展示一下:
public void test() {
final long started = System.currentTimeMillis();
Observable<String> just1 = Observable.just(doSomethingLong()).subscribeOn(Schedulers.newThread());
Observable<String> just2 = Observable.just(doSomethingLong()).subscribeOn(Schedulers.newThread());
// Observable.zip(just1, just2, new Func2<String, String, Combined>() {
// @Override
// public Combined call(String s, String s2) {
// return new Combined(s, s2);
// }
// }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Combined>() {
// @Override
// public void onCompleted() {
//
// }
//
// @Override
// public void onError(Throwable e) {
//
// }
//
// @Override
// public void onNext(Combined combined) {
// long total = System.currentTimeMillis() - started;
// Timber.i("TOTAL [%d]ms [%s]", total, combined.toString());
// }
// });
}
这段代码几乎相同 - 它一个接一个地运行了两次 doSomethingLong()...
我的期望: 1. 我需要 doSomethingLong() 方法并行运行 2. 我要求在开始订阅它们之前解释为什么这些方法会运行。 3. 在这种情况下我应该如何写好我的代码。我希望在订阅之前不要调用 doSomethingLong() 方法。
非常感谢。希望我能很好地解释问题。
【问题讨论】:
-
您需要
defer。将调用方法doLongStuff调用just。您也可以使用create而不是just。 -
这是一个常见的误解,之前确实被问过很多次。根本的问题是括号并没有神奇地推迟方法的执行——不知道为什么人们会这么想。您调用
doSomethingLong()来获取一个值,然后使用该值创建一个源,此时从 RxJava 的角度来看,该值是一个常量引用。
标签: java observable rx-java reactive-programming rx-java2