【问题标题】:Concat multiple observables to one source将多个 observables 连接到一个源
【发布时间】:2020-12-28 14:02:18
【问题描述】:

我有很多发出不同类型的网络调用,例如字符串、整数等。

我正在尝试使它们平行。

在官方 rxjava 文档中我们可以阅读:

Flowable.range(1, 10)
  .parallel()
  .runOn(Schedulers.computation())
  .map(v -> v * v)
  .sequential()
  .blockingSubscribe(System.out::println);

这个例子很简单,因为我们将所有类型都设为 Int。但是,如果我们有不同的类型,例如如何做到这一点字符串、布尔值、整数?

  • 来自此 5 的每个调用都是独立的
  • 这5个调用中的一组将在一个方法中,并且该方法将被其他随机方法调用。
  • 我们可以假设,5 次调用的结果将是第一次调用的类型 -> string

【问题讨论】:

  • 我认为这个问题缺少一些信息:哪个事件触发了多个调用?多次调用的结果是什么?我想是将它们合并到一个对象中? Flowable 并不是在 Rx 中触发并行的唯一解决方案。
  • 我添加了很多附加信息

标签: java android kotlin rx-java rx-java2


【解决方案1】:

组合 Observables

使用多个源 Observable 来创建单个 Observable 的操作员

  • And/Then/When — 通过 Pattern 和 Plan 中介组合两个或多个 Observable 发出的项目集

  • CombineLatest — 当两个 Observable 中的任何一个发出 item 时,通过指定函数组合每个 Observable 发出的最新 item,并根据此函数的结果发出 item

  • Join — 当一个 Observable 中的一个项目在根据另一个 Observable 发射的项目定义的时间窗口内发射时,合并两个 Observable 发射的项目

  • Merge — 通过合并多个 Observable 的排放量来将它们合并为一个

  • StartWith — 在开始从源 Observable 发射项目之前发射指定的项目序列

  • Switch — 将发出 Observables 的 Observable 转换为单个 Observable,该 Observable 发出由最近发出的 Observables 发出的项目

  • Zip — 通过指定函数将多个 Observable 的发射组合在一起,并根据该函数的结果为每个组合发射单个项目

我认为你可以使用zipcombineLatest

阅读此文档http://reactivex.io/documentation/operators.html#combining

我想你也需要知道

RxJava 调度器简介。

  • Schedulers.io() - 这用于执行非 CPU 密集型操作,例如进行网络调用、读取磁盘/文件、数据库操作等,它维护一个线程池。

  • Schedulers.newThread() – 使用它,每次调度任务时都会创建一个新线程。通常建议不要使用调度程序,除非有一个非常长时间运行的操作。通过 newThread() 创建的线程不会被重用。

  • Schedulers.computation() - 该调度器可用于执行 CPU 密集型操作,如处理大量数据、位图处理等,使用此调度器创建的线程数完全取决于 CPU 数量可用的内核。

  • Schedulers.single() - 此调度程序将按添加的顺序执行所有任务。这可以在需要顺序执行时使用。

  • Schedulers.immediate() - 此调度程序通过阻塞主线程以同步方式立即执行任务。

  • Schedulers.trampoline() - 它以先进先出的方式执行任务。将后台线程数限制为1个,所有的定时任务都会一个一个地执行。

  • Schedulers.from() - 这允许我们通过限制要创建的线程数从执行程序创建调度程序。当线程池被占用时,任务会排队。

【讨论】:

  • 我试图用 zip 或 combineLatest 来做这件事,这看起来不错,但工作时间与异步风格完全相同或更像,所以这种并行不起作用。我在堆栈上找到了带有信息的答案,我们不能使用 zip 或 combinelatest:stackoverflow.com/questions/44643106/…
  • 我经常使用它,我认为你用错了,你应该阅读链接中的 cmets 或其他答案,
  • flatMap 只是 concat observables 意味着新的 Observable 等到前一个发出值而不是同时调用它们。如果它不适合你,我认为这是因为你使用了错误的 Schaduler
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-01-16
  • 1970-01-01
  • 1970-01-01
  • 2014-11-16
相关资源
最近更新 更多