【问题标题】:Observable.just(doSomeLongStuff()) run doSomeLongStuff() before I subscribe to observableObservable.just(doSomeLongStuff()) 在我订阅 observable 之前运行 doSomeLongStuff()
【发布时间】:2019-07-10 20:01:06
【问题描述】:

我对 RxJava2 有一个愚蠢的问题。

我需要同时运行两个长时间的操作。我知道我应该使用 Observable.zip() 并且我使用它。

问题是我的长操作是一个接一个地运行,另一个问题是我的长操作在我订阅它们之前就开始了。

假设这是我应该异步运行的长时间操作。

private String doSomethingLong() {
        Random rand = new Random();
        int value = rand.nextInt(5);
        Timber.i("Do something for [%d] sec [%s]", value, Thread.currentThread().getName());
        try {
            Thread.sleep(value * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
            return String.format(Locale.getDefault(), "Exception [%s]", e.getMessage());
        }
        return String.format(Locale.getDefault(),"Job for [%d] seconds", value);
    }

还有一个像 test() 这样的方法会尝试使其并行:

public void test() {

        final long started = System.currentTimeMillis();
        Observable<String> just1 = Observable.just(doSomethingLong()).subscribeOn(Schedulers.newThread());
        Observable<String> just2 = Observable.just(doSomethingLong()).subscribeOn(Schedulers.newThread());


        Observable.zip(just1, just2, new Func2<String, String, Combined>() {
            @Override
            public Combined call(String s, String s2) {
                return new Combined(s, s2);
            }
        }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Combined>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Combined combined) {
                long total = System.currentTimeMillis() - started;
                Timber.i("TOTAL [%d]ms [%s]", total, combined.toString());
            }
        });

    }

当我尝试运行它时,我观察到两个 observables just1 和 just2 一个接一个地运行......这让我很困惑......

但是还有另一个员工让我更加困惑...我评论了 Observable.zip 并注意到 just1 和 just2 在我订阅它们之前启动了方法 doSomethingLong()...

让我展示一下:

public void test() {

        final long started = System.currentTimeMillis();
        Observable<String> just1 = Observable.just(doSomethingLong()).subscribeOn(Schedulers.newThread());
        Observable<String> just2 = Observable.just(doSomethingLong()).subscribeOn(Schedulers.newThread());


//        Observable.zip(just1, just2, new Func2<String, String, Combined>() {
//            @Override
//            public Combined call(String s, String s2) {
//                return new Combined(s, s2);
//            }
//        }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Combined>() {
//            @Override
//            public void onCompleted() {
//
//            }
//
//            @Override
//            public void onError(Throwable e) {
//
//            }
//
//            @Override
//            public void onNext(Combined combined) {
//                long total = System.currentTimeMillis() - started;
//                Timber.i("TOTAL [%d]ms [%s]", total, combined.toString());
//            }
//        });

    }

这段代码几乎相同 - 它一个接一个地运行了两次 doSomethingLong()...

我的期望: 1. 我需要 doSomethingLong() 方法并行运行 2. 我要求在开始订阅它们之前解释为什么这些方法会运行。 3. 在这种情况下我应该如何写好我的代码。我希望在订阅之前不要调用 doSomethingLong() 方法。

非常感谢。希望我能很好地解释问题。

【问题讨论】:

  • 您需要defer。将调用方法doLongStuff 调用just。您也可以使用create 而不是just
  • 这是一个常见的误解,之前确实被问过很多次。根本的问题是括号并没有神奇地推迟方法的执行——不知道为什么人们会这么想。您调用doSomethingLong() 来获取一个值,然后使用该值创建一个源,此时从 RxJava 的角度来看,该值是一个常量引用。

标签: java observable rx-java reactive-programming rx-java2


【解决方案1】:

Observable.just 在您订阅时不会运行任何内容。它会在您订阅时发出元素,但您的 doSomethingLong 将在您将其作为参数传递后立即运行。这很正常,这就是语言的运作方式。

您正在寻找的是一种在我们订阅时返回此内容的方式,但也只能在那时运行它,并希望在后台线程上运行。

对此有几个答案,以下是一些:

使用延迟

有一个名为 defer 的运算符,它接受一个 lambda,一旦你订阅就会执行:

Observable.defer(() ->  doSomethingLong())

这只会在您订阅时执行doSomethingLong

使用 fromCallable

您可以从 lambda 创建一个 observable。这被称为fromCallable

Observable.fromCallable(() -> doSomethingLong())

同样,这只会在您订阅时运行doSomethingLong

使用创建

我认为这可能是最令人沮丧的方式,因为您必须处理一些事情,但我认为为了完整性,可以提及:

Observable.create( emitter -> {
    if(emitter.isDisposed()) return;

    emitter.onNext(doSomethingLong());
    emitter.onComplete();
});

同样,我确信还有更多方法可以做到这一点。我只是想解释一下这个问题并给出一些选择。

【讨论】:

    【解决方案2】:

    【讨论】:

      猜你喜欢
      • 2018-03-13
      • 1970-01-01
      • 1970-01-01
      • 2016-12-30
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多