【问题标题】:RX: Run Zipped Observables in parallel?RX:并行运行压缩的 Observables?
【发布时间】:2014-01-18 06:52:45
【问题描述】:

所以我在玩 RX(真的很酷),我一直在转换我的 api,它访问 Android 中的 sqlite 数据库以返回 observables。

所以我开始尝试解决的问题之一自然是,“如果我想进行 3 次 API 调用,获取结果,然后在它们全部完成后进行一些处理怎么办?”

我花了一两个小时,但我最终找到了Zip Functionality,它可以轻松帮助我:

    Observable<Integer> one = getNumberedObservable(1);
    Observable<Integer> two = getNumberedObservable(2);
    Observable<Integer> three = getNumberedObservable(3);

    Observable.zip(one, two, three, new Func3<Integer, Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer arg0, Integer arg1, Integer arg2) {
            System.out.println("Zip0: " + arg0);
            System.out.println("Zip1: " + arg1);
            System.out.println("Zip2: " + arg2);
            return arg0 + arg1 + arg2;
        }
    }).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer arg0) {
            System.out.println("Zipped Result: " + arg0);
        }
    });

public static Observable<Integer> getNumberedObservable(final int value) {
    return Observable.create(new OnSubscribeFunc<Integer>() {
        @Override
        public Subscription onSubscribe(Observer<? super Integer> observer) {
            observer.onNext(value);
            observer.onCompleted();
            return Subscriptions.empty();
        }
    });
}

太棒了!所以这很酷。

所以当我压缩 3 个 observables 时,它们会串行运行。如果我想让它们同时并行运行,这样我最终能更快地得到结果怎么办?我玩过一些东西,甚至尝试阅读人们写的一些原始 RX 东西在 C# 中。我确信有一个简单的答案。谁能指出我正确的方向?这样做的正确方法是什么?

【问题讨论】:

  • 如果我可能会问,您为什么要等到三个都完成后再进行处理?
  • @ScottSEA 当然,所以说我有一个屏幕,需要来自 SQLite 的 3 个不同元素才能正确绘制,或者 3 个不同的网络信息。在继续绘制屏幕之前,我想确保我拥有一切。
  • 所以您不是等到序列完成,而是等到每个序列都有一个值?
  • @ScottSEA 我设置 api 的方式是一切都只返回一个对象然后完成。如果您觉得有更好的方法来设置它,我会全力以赴。我对 RX 很陌生,很想听听您的意见。
  • @ScottSEA 即使我要返回多个项目,我也只会发送一个项目列表而不是多个 onNext 调用。

标签: java system.reactive rx-java


【解决方案1】:

zip 确实并行运行 observables - 但它也订阅 串行。由于您的getNumberedObservable 是在订阅方法中完成的,它给人的印象是串行运行,但实际上并没有这样的限制。

您可以尝试使用一些长期运行的 Observable,这些 Observable 的订阅逻辑比它们的订阅逻辑长,例如 timer,或者使用 subscribeOn 方法异步订阅传递给 zip 的每个流。

【讨论】:

  • 啊!我不敢相信我忘了使用 subscribeOn。感谢您指出了这一点。我测试了它并且它有效。所以现在我的问题是,如果我希望它们全部连续运行,我会在一个线程上订阅它们,还是有更好的方法将所有可观察对象组合在一起以连续运行它们?谢谢!
  • 如果你想连续运行它们,你可能来错了 API! :) 开个玩笑,你可以做到,但它很繁琐。如果流都是相同的类型,你可以使用concat链接它们,如果它们都不同,那么你可以创建一个类型来保存n个结果,然后使用concatselect来投影每个流的结果到类型中的占位符并使用scan 来累积单个结果。您也可以只订阅前面的 onCompleted 中的每个连续流。
  • 您还可以使用selectMany 将前一个流的结果投影到下一个流的查询中。如果将一个流的结果传递给下一个流,则此方法效果很好。
  • 此外,不能保证在单个线程上全部订阅它们 - subscribe 方法在运算符完成之前返回是很常见的。
【解决方案2】:

在 RxJava 中,使用 toAsync 将常规函数转换为将在线程上运行并将其结果返回到 observable 的东西。

我不太了解 Java 语法,但它看起来像:

public static Integer getNumber(final int value) { return value; }
public static Observable<Integer> getNumberedObservable(final int value) {
    return rx.util.functions.toAsync(new Func<Integer,Integer>() {
        @Override
        public Integer call(Integer value) { return getNumber(value); }
    });
};

如果getNumber 真的在访问数据库,这将起作用。当您调用 getNumberedObservable 时,它会返回一个 observable,当您订阅它时,它将在单独的线程上运行 getNumber

【讨论】:

    【解决方案3】:

    我也尝试过这样做,使用 zip 并行运行多个线程。我结束了打开new so question 并得到了答案。基本上,您必须为每个 observable 订阅一个新线程,因此如果您想使用 zip 并行运行三个 observable,您必须订阅 3 个单独的线程。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2012-12-12
      • 2022-06-15
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多