【问题标题】:When is an observable chain triggered by main thread主线程何时触发可观察链
【发布时间】:2018-07-11 17:23:35
【问题描述】:

我从不同的服务调用中获取一个人的信息。有些可以并行执行。其他依赖于并行调用返回的信息。 计算完所有信息后,我将人员信息保存到存储库中

这是我设置流程的方式:

并行调用:

private Observable<Person> getDataForEval(Person person){
 Person person = new Person();
 Observable<Salary> observeSalary = getSalary(person).subscribeOn(Schedulers.io);
 Observable<HomeAddress> observeHome = getHomeAddress(person).subscribeOn(Schedulers.io);
 return  Observable.zip(observeSalary , observeHome , (salary, home) -> buildPersonAfterGettingData(salary,home));
}

依赖计算:

private Observable<Person> getDataAfterCalc(Person person) {
  Observable<LoanEligiblity> loanEligible= getLoanEligibility(person);
  Observable<Tax> observeTax= getTax(person);
  return Observable.zip(loanEligible, observeTax, (loan, tax) ->
                buildFinalPersonInfo(loan, tax));
}

在主线程中链接两者:

Observable<Person> finalPersonInfo = getDataForEval.flatMap(person -> getDataAfterCalc(person));
finalPersonInfo.subscribe( finalPerson -> save(finalPerson));

问题 - 何时触发此流程?我的理解是,当主线程调用 subscribe 方法时,会触发 Observable.zip() - 用于进行并行调用 - 并且后续订阅者会收到响应。

正确吗?如果我需要知道处理一个人所花费的时间,我可以计算如下:

finalPersonInfo.onSubscribe(()->start).onTerminate(()->finish);

【问题讨论】:

    标签: java java-8 rx-java observable rx-java2


    【解决方案1】:

    你的理解几乎是对的。

    zip 操作员在下游订阅它时订阅它的源。请参阅ObservableZip#72ObservableZip#110

    但是zip操作符与并行无关,并行调用是因为subscribeOn操作符。

    你用处理一个人来衡量时间的方法是对的。

    顺便说一句,在这个场景中你应该使用Single 而不是Observable

    Single 类为单值响应实现了响应式模式。 Single 的行为与 Observable 相同,只是它只能发出单个成功值或错误(没有像 Observable 那样的“onComplete”通知)

    我的 RxJava 源码版本是 2.1.13

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多