【问题标题】:Why does the Observable not create on the right thread?为什么 Observable 不在正确的线程上创建?
【发布时间】:2015-05-02 05:34:59
【问题描述】:
Observable observable = Observable.from(backToArray(downloadWebPage("URL")))
            .map(new Func1<String[], Pair<String[], String[]>>() {
                @Override
                public Pair<String[], String[]> call(String[] of) {
                    return new Pair<>(of,
                            backToArray(downloadWebPage("URL" + of[0])).get(0));
                }
            });

    observable.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(
            (new Observer<Pair>() {

                @Override
                public void onCompleted() {
                    // Update user interface if needed
                }

                @Override
                public void onError(Throwable t) {
                    // Update user interface to handle error
                }

                @Override
                public void onNext(Pair p) {
                   offices.add(new Office((String[]) p.first, (String[]) p.second));
                }
           }));

这运行,我得到 android.os.NetworkOnMainThreadException。我希望它运行 subscribeOn() 方法设置的新线程。

【问题讨论】:

    标签: android network-programming system.reactive rx-java rx-android


    【解决方案1】:

    假设实际的网络请求发生在downloadWebPage(),错误在你代码的第一行:

    Observable observable = Observable.from(backToArray(downloadWebPage("http://api.ataxcloudapp.com/v1/franchise/listing/?location=" + ZIPCode)))
    

    这相当于:

    String[] response = downloadWebPage("http://api.ataxcloudapp.com/v1/franchise/listing/?location=" + ZIPCode)
    
    Observable observable = Observable.from(backToArray(response))
    

    这应该清楚地表明 downloadWebPage 是在主线程上执行的 - 在任何 Observable 被创建之前,更不用说订阅了。 RxJava 在这方面无法改变 Java 的语义。

    但是你可以做的是这样的事情(未经测试,但应该是正确的):

    Observable observable = Observable.create(new Observable.OnSubscribe<String[]>() {
            @Override
            public void call(final Subscriber<? super String[]> subscriber) {
                final String[] response = downloadWebPage("http://api.ataxcloudapp.com/v1/franchise/listing/?location=" + ZIPCode);
                if (! subscriber.isUnsubscribed()) {
                    subscriber.onNext(backToArray(response));
                    subscriber.onCompleted();
                }
            }
    )
    

    现在您的网络请求将仅在 Observable 被订阅后发生,并将被移动到您在 subscribeOn() 中指定的线程。

    【讨论】:

    • 您可以在订阅时使用Observable.defer() 创建 Observable。看看我的回答。
    【解决方案2】:

    您可以使用 defer() 将调用 downloadWebPage 推迟到您订阅 observable 的那一刻。

    例子:

    private Object slowBlockingMethod() { ... }
    
    public Observable<Object> newMethod() {
        return Observable.defer(() -> Observable.just(slowBlockingMethod()));
    }
    

    Source

    【讨论】:

      【解决方案3】:

      你应该改变

      **observable.subscribeOn(Schedulers.newThread())**
      

      **observable.subscribeOn(Schedulers.io())**
      

      【讨论】:

        猜你喜欢
        • 2020-09-10
        • 1970-01-01
        • 1970-01-01
        • 2016-12-01
        • 2014-03-20
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多