【问题标题】:How to run 2 queries sequentially in a Android RxJava Observable?如何在 Android RxJava Observable 中按顺序运行 2 个查询?
【发布时间】:2016-01-12 11:04:48
【问题描述】:

我想运行 2 个异步任务,一个接着另一个(按顺序)。我读过一些关于 ZIP 或 Flat 的东西,但我不是很了解它......

我的目的是从本地 SQLite 加载数据,完成后,它将查询调用到服务器(远程)。

有人可以建议我,实现这一目标的方法吗?

这是我正在使用的 RxJava Observable 骨架(单个任务):

    // RxJava Observable
    Observable.OnSubscribe<Object> onSubscribe = subscriber -> {
        try {

            // Do the query or long task...

            subscriber.onNext(object);
            subscriber.onCompleted();
        } catch (Exception e) {
            subscriber.onError(e);
        }
    };

    // RxJava Observer
    Subscriber<Object> subscriber = new Subscriber<Object>() {
        @Override
        public void onCompleted() {
            // Handle the completion
        }

        @Override
        public void onError(Throwable e) {
            // Handle the error
        }

        @Override
        public void onNext(Object result) {

          // Handle the result

        }
    };

    Observable.create(onSubscribe)
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(subscriber);

【问题讨论】:

标签: android rx-java observable subscriber retrolambda


【解决方案1】:

执行此操作的操作员是merge,请参阅http://reactivex.io/documentation/operators/merge.html

我的方法是创建两个 observables,比如说 observableLocalobservableRemote,然后合并输出:

Observable<Object> observableLocal = Observable.create(...)
Observable<Object> observableRemote = Observable.create(...)
Observable.merge(observableLocal, observableRemote)
          .subscribe(subscriber)

如果你想确保远程在本地之后运行,你可以使用concat

【讨论】:

  • 这只有在 observableLocal 和 observableRemote 返回相同的对象或该对象扩展的东西时才有效
  • 在我的情况下,第二个观察者需要第一个观察者的输入,然后如何做到这一点 -stackoverflow.com/questions/58994906/…
【解决方案2】:

如果查询不相互依赖,Lukas Batteau 的答案是最好的。但是,如果您需要从本地 SQLite 查询中获取数据在运行远程查询之前(例如,您需要远程查询参数或标头的数据),那么您可以从本地可观察对象,然后将其平面映射以组合两个可观察对象您从本地查询中获取数据之后:

   Observable<Object> localObservable = Observable.create(...)
   localObservable.flatMap(object -> 
   {
       return Observable.zip(Observable.just(object), *create remote observable here*, 
           (localObservable, remoteObservable) -> 
           {
               *combining function*
           });
   }).subscribe(subscriber);

flatmap 函数允许您通过 zip 函数将本地 observable 转换为本地和远程 observable 的组合。重申一下,这里的优点是两个 observables 是顺序的,并且 zip 函数只会在两个依赖的 observables 运行后运行。

此外,即使底层对象具有不同的类型,zip 函数也允许您组合可观察对象。在这种情况下,您提供一个组合函数作为第三个参数。如果底层数据是同一类型,则将 zip 函数替换为合并。

【讨论】:

    【解决方案3】:

    您可以尝试我的解决方案,有几种方法可以解决您的问题。
    为了确保它正常工作,我创建了一个独立的工作示例并使用此 API 进行测试:https://jsonplaceholder.typicode.com/posts/1

    private final Retrofit retrofit = new Retrofit.Builder()
                .baseUrl("https://jsonplaceholder.typicode.com/posts/")
                .addConverterFactory(GsonConverterFactory.create())
                .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
                .build();
    
        private final RestPostsService restPostsService = retrofit.create(RestPostsService.class);
    
        private Observable<Posts> getPostById(int id) {
            return restPostsService.getPostsById(id);
        }
    

    RestPostService.java

    package app.com.rxretrofit;
    
    import retrofit2.http.GET;
    import retrofit2.http.Path;
    import rx.Observable;
    
    /**
     * -> Created by Think-Twice-Code-Once on 11/26/2017.
     */
    
    public interface RestPostsService {
    
        @GET("{id}")
        Observable<Posts> getPostsById(@Path("id") int id);
    }
    

    Solution1在顺序调用多个任务时使用,前一个任务的结果总是下一个任务的输入

    getPostById(1)
                    .concatMap(posts1 -> {
                        //get post 1 success
                        return getPostById(posts1.getId() + 1);
                    })
                    .concatMap(posts2 -> {
                        //get post 2 success
                        return getPostById(posts2.getId() + 1);
                    })
                    .concatMap(posts3 -> {
                        //get post 3success
                        return getPostById(posts3.getId() + 1);
                    })
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(finalPosts -> {
                        //get post 4 success
                        Toast.makeText(this, "Final result: " + finalPosts.getId() + " - " + finalPosts.getTitle(),
                                Toast.LENGTH_LONG).show();
                    });
    

    Solution2在顺序调用多个任务时使用,之前任务的所有结果都是最终任务的输入(例如:上传头像图片后)和封面图片,调用api使用这些图片URL创建新用户)

    Observable
                    .zip(getPostById(1), getPostById(2), getPostById(3), (posts1, posts2, posts3) -> {
                        //this method defines how to zip all separate results into one
                        return posts1.getId() + posts2.getId() + posts3.getId();
                    })
                    .flatMap(finalPostId -> {
                        //after get all first three posts, get the final posts,
                        // the final posts-id is sum of these posts-id
                        return getPostById(finalPostId);
                    })
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(finalPosts -> {
                        Toast.makeText(this, "Final posts: " + finalPosts.getId() + " - " + finalPosts.getTitle(),
                                Toast.LENGTH_SHORT).show();
                    });
    

    AndroidManifest

     <uses-permission android:name="android.permission.INTERNET"/>
    

    root build.gradle

    // Top-level build file where you can add configuration options common to all sub-projects/modules.
    
    buildscript {
        repositories {
            jcenter()
        }
        dependencies {
            classpath 'com.android.tools.build:gradle:2.3.3'
            classpath 'me.tatarka:gradle-retrolambda:3.2.0'
            classpath 'me.tatarka.retrolambda.projectlombok:lombok.ast:0.2.3.a2'
    
            // NOTE: Do not place your application dependencies here; they belong
            // in the individual module build.gradle files
        }
    
        // Exclude the version that the android plugin depends on.
        configurations.classpath.exclude group: 'com.android.tools.external.lombok'
    }
    
    allprojects {
        repositories {
            jcenter()
        }
    }
    
    task clean(type: Delete) {
        delete rootProject.buildDir
    }
    

    app/build.gradle

    apply plugin: 'me.tatarka.retrolambda'
    apply plugin: 'com.android.application'
    
    android {
        compileSdkVersion 26
        buildToolsVersion "26.0.1"
        defaultConfig {
            applicationId "app.com.rxretrofit"
            minSdkVersion 15
            targetSdkVersion 26
            versionCode 1
            versionName "1.0"
            testInstrumentationRunner "android.support.test.runner.AndroidJUnitRunner"
        }
        buildTypes {
            release {
                minifyEnabled false
                proguardFiles getDefaultProguardFile('proguard-android.txt'), 'proguard-rules.pro'
            }
        }
    
        compileOptions {
            sourceCompatibility JavaVersion.VERSION_1_8
            targetCompatibility JavaVersion.VERSION_1_8
        }
    }
    
    dependencies {
        compile fileTree(dir: 'libs', include: ['*.jar'])
        androidTestCompile('com.android.support.test.espresso:espresso-core:2.2.2', {
            exclude group: 'com.android.support', module: 'support-annotations'
        })
        compile 'com.android.support:appcompat-v7:26.+'
        compile 'com.android.support.constraint:constraint-layout:1.0.2'
        testCompile 'junit:junit:4.12'
    
        provided 'org.projectlombok:lombok:1.16.6'
        compile 'com.squareup.retrofit2:retrofit:2.3.0'
        compile 'com.squareup.retrofit2:converter-gson:2.3.0'
        compile 'com.squareup.retrofit2:adapter-rxjava:2.3.0'
        compile 'io.reactivex:rxandroid:1.2.1'
    }
    

    型号

    package app.com.rxretrofit;
    import com.google.gson.annotations.SerializedName;
    /**
     * -> Created by Think-Twice-Code-Once on 11/26/2017.
     */
    public class Posts {
        @SerializedName("userId")
        private int userId;
        @SerializedName("id")
        private int id;
        @SerializedName("title")
        private String title;
        @SerializedName("body")
        private String body;
        public int getUserId() {
            return userId;
        }
        public void setUserId(int userId) {
            this.userId = userId;
        }
        public int getId() {
            return id;
        }
        public void setId(int id) {
            this.id = id;
        }
        public String getTitle() {
            return title;
        }
        public void setTitle(String title) {
            this.title = title;
        }
        public String getBody() {
            return body;
        }
        public void setBody(String body) {
            this.body = body;
        }
    }
    

    顺便说一句,使用 Rx + Retrofit + Dagger + MVP 模式 是一个很好的组合。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-07-21
      • 1970-01-01
      • 1970-01-01
      • 2019-09-02
      • 1970-01-01
      • 2017-03-17
      相关资源
      最近更新 更多