【问题标题】:RxJava zip that gets latest of other observableRxJava zip 获取最新的其他 observable
【发布时间】:2017-08-30 03:49:51
【问题描述】:

我想实现一个zip,即当其中一个可观察源发出数据时,它不会等待其他可观察源发出数据,而是获取最新发出的数据(如果有)。

obs1 --true-|->
obs2 --true-|->  
obs3 --1-2-3-4->

zip 应该执行带有参数的TriFunction

true, true, 1
true, true, 2
true, true, 3
true, true, 4

我希望我的问题有意义。

扩展问题

我已经解决了部分问题,我还有一个问题要给你们。 obs1obs2 是昂贵的操作,它们将发出 truefalse。我需要的是,在 obs3 的每次发射中,如果 obs1obs2 在之前的发射中它们中的任何一个发射 false,我都需要重新执行。我在上面写的最好的情况是obs1obs2obs3 的第一次发射时发射true

-------1------------2-----------------3------------->
---true/false---true/re-execute---true/re-execute--->
---true/false---true/re-execute---true/re-execute--->

编辑“两者”在扩展问题上具有误导性。

我的意思是,如果 obs1 在之前的发射中为假,请重新执行 obs1。如果obs2 在之前的发射中为假,则重新执行obs2。如果其中之一是false,则不要重新执行它们。

【问题讨论】:

  • 那么,您是说要合并每个来源的最新值?在这种情况下,您正在寻找 combineLatest - 是的,有时 RxJava 真的那么明显。 :-)
  • 事情就是这样,我得到了 1 个 observable,它需要来自 2 个 observable 的数据才能继续进行(有点像要求)。而且这些要求很昂贵,因此只执行一次是理想的。 @david.mihola
  • 哦,是的 combineLatest 是最合适的。谢谢@david.mihola
  • 顺便问一下@david.mihola,如果其中一个来源完成会发生什么?
  • @david.mihola 我运行了一个简单的代码来验证。 combineLatest 在完成之前等待最后一个 observable 完成。 gist.github.com/novodimaporo/06be7555fb579bd2a58392122f81a917

标签: rx-java rx-java2


【解决方案1】:

编辑:扩展问题是一个完全不同的问题,需要不同的运算符,例如:

Observable<Boolean> obs1 = ...
Observable<Boolean> obs2 = ...

Observable<Integer> obs3 = ...

Function3<Integer, Boolean, Boolean> func = ...

// store last result of obs1 and obs2
boolean[] lastResults = { false, false };

// for each main value
obs3.concatMap(v -> {
    // if any of the previous results were false
    if (!lastResults[0] || !lastResults[1]) {
        // run both obs1 and obs2 again
        return Observable.zip(obs1, obs2, (a, b) -> {
            // save their latest results
            lastResult[0] = a;
            lastResult[1] = b;
            // apply the function to get the output
            return func(v, a, b);
        });
    }
    // otherwise call the function with true
    return Observable.just(func.apply(v, true, true));
})
.subscribe(...);

【讨论】:

  • 嗨@akarnokd,更新了我的问题以显示我的整个问题。
  • 我的想法是像你说的那样使用withLatestFrom,然后在obs1obs2 上使用scan()。但是scan() 并没有太大帮助,因为我需要先进入扫描内部,然后才能访问累积值并在BiFunction 中处理我的逻辑,但是我的逻辑是用 rx 编写的,我不能在BiFunction 中使用它.
  • 哦,@akarnokd 先生,既然你把那个变量放在那里,我对 rx 的心态就坏了。真的可以吗?访问流外的数据?
  • 一次性消费的 observables,是的。如果您多次订阅,则必须将lastResultsobs3.concatMap(...) 部分放入Observable.defer(() -&gt; { ... })
  • withLatestFromscan 不会产生您在扩展问题中指定的行为。
猜你喜欢
  • 1970-01-01
  • 2018-06-29
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-11-06
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多