【问题标题】:Subscriber onNext is called before completion of the asynchronous requests in rxjava2在 rxjava2 中的异步请求完成之前调用订阅者 onNext
【发布时间】:2018-09-07 15:54:02
【问题描述】:

我已经使用 RxJava2 在 MVP 中实现了一个存储库模式

RemoteDataSource.java

public Observable<List<A>> getAList(){
          return ApiService.
                getAList()
                .compose(RxUtils.applySchedulers())
                .doOnSubscribe(disposable -> Timber.d(..))
                .doOnError(throwable -> Timber.d(..))
                .doOnComplete(() -> {
                    Timber.d(..);
                });
      }

LocalDataSource.java

public Observable<List<A>> getAList(){
    return mDbHelper .....from SQLBrite..
}

public void saveAList(List<<A> a){
    SQlBriteTransaction...
}

Repository.java(更新)

  @Inject
  public Repository(DownloadUtils downloadUtils){
   this.mDownloadUtils = downloadUtils;
   }

   @Override
   public Observable<List<A>> getAList(){
     return  mRemoteDataSource
            .getAList()
             .flatMapIterable(List<A> -> a)
            .flatMap(A a ->
      ************************************************************   
               return Observable.fromIterable(a.getB())
                     .flatMap((Function<B, ObservableSource<B>>) b ->
                                       Observable.create(emitter -> 
                                                emitter.onNext(new 
            DownloadUtils().downloadFiles(b,totalListCount,emitter))))
                                .toList()
                                .toObservable()

     ***************************************************
                    .toList()
                    .toObservable()
                    .doOnNext( List<A>  a -> {

     --------------Only the first change in B value is inserted in Db-
                       mLocalDataSource.saveAList(a);
                    });
    }

DownLoadUtils.java(更新)

void downloadBFiles(B b, int totalCount,ObservableEmitter<B> emitter){
        fileCount = b.size;
         b.get(index).setDataToChange(dataToChange);

         *** I am using PR Downloader for aynchronous download using 
            RECURSION **
        PRDownloader.download(remoteUrl, filePath, fileName)
                .build()
        .setOnStartOrResumeListener(() -> {
            })
            .setOnProgressListener(progress -> {
                int progressPercent = (int) (progress.currentBytes * 
                   100 / progress.totalBytes);,

             })
            .start(new OnDownloadListener() {
                @Override
                public void onDownloadComplete() {
  ********************* emitter.onComplete() ******************


             @Override
                public void onError(Error error) {
                  }   
     }

Presenter.java

void getVideosFromRepo(){

    disposable = mRepository
                 .getAList()
                 .doOnSubscribe(d _-> "Started Loading")
                 .subscribe(
                   //OnNext
    ------------- Here the OnNext is being called before Asynchronous Operation completes!!-------

                    List<A> a -> mView.setAList(a);
                   )


}

在演示者实现之上,甚至在异步下载完成之前返回 PresenteronNext 中的列表...需要进行哪些更改,以便 onNext(subscribe) 是在所有下载完成后调用完成。!!!

【问题讨论】:

    标签: android rx-java rx-java2 dagger-2 android-mvp


    【解决方案1】:

    您正在使用 RxJava 观察者链外部的异步服务,因此 RxJava 无法管理正在传递的数据。由于downloadBFiles() 使用了单独的观察者链,可以说你已经失去了线程。

    您需要使用flatMap(),而不是使用doOnNext() 来触发下载,以便下载结果包含在您的观察者链中。

    【讨论】:

    • ..hi.如果我用 flatMap 替换 doOnNext 是否能够返回 a 的值并更改 b..能否请您显示需要完成的代码更改!
    • 嗨..我尝试使用 flatMap 仍然得到相同的结果...!!而不是 doOnNext 我使用 .flatMap((Function, ObservableSource>>) List b -> Observable.fromCallable(() -> mDownloadUtils.downloadFiles(b,0)))..map(b -> { a.setB(b); return a; }))
    • downloadFiles() 还在使用单独的“PR 下载器”吗?如果是这样,flatMap() 将无济于事,因为它不会等待下载完成。
    • ...是的,它正在使用单独的下载器!!...有什么解决方法吗!!
    • 您可以创建一个包装单独下载器的 observable。使用Observable.create() 并让您的下载侦听器发出该值。在不知道您如何调用下载器的情况下,我无法向您展示示例代码。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-04-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多