【问题标题】:Get the latest value of an Observable and emit it immeditely获取 Observable 的最新值并立即发出
【发布时间】:2016-08-28 19:09:24
【问题描述】:

我正在尝试获取给定 Observable 的最新值并让它发出 一旦被调用就立即。以下面的代码为例:

return Observable.just(myObservable.last())
    .flatMap(myObservable1 -> {
        return myObservable1;
    })
    .map(o -> o.x) // Here I want to end up with a T object instead of Observable<T> object

这不起作用,因为这样做flatMap 将发出myObservable1,而myObservable1 发出到达map。 我不知道做这样的事情是否可能。有没有人知道如何实现这个目标?谢谢

【问题讨论】:

  • 你在说什么可观察的?最新是什么意思?
  • myObservable 是一个热可观察对象,它以不规则的间隔发出例如:“1”、“2”、“3”。我想要做的是能够在达到return 指令时获得myObservable 最新值(在我们的例子中为“3”)
  • myObservable 是热的还是冷的?
  • 看来您仍在以命令式(而非反应式)的方式思考。能不能给个更大的视角?重点是通常,您基于其他可观察对象创建可观察对象。您可以基于 myObservable 创建 lastObservable,它将被阻塞直到 myObservable 结束,然后发出最后一个值。除了订阅这样的端点之外,没有办法摆脱这些流“管”。
  • 我曾经在 RxJava Google Group 上问过一个类似的问题——也许 Ben 的回答对你有帮助:groups.google.com/forum/#!searchin/rxjava/mihola/rxjava/…

标签: java rx-java reactive-programming rx-android


【解决方案1】:

last() 方法在这里不会有任何帮助,因为它会等待 Observable 终止给你发出的最后一个项目。

假设您无法控制发出的 observable,您可以简单地创建一个 BehaviorSubject 并将其订阅到发出您想要侦听的数据的 observable,然后订阅创建的主题。因为Subject 既是Observable 又是Subscriber,你会得到你想要的。

我认为(现在没有时间检查)您可能必须手动取消订阅原始 observable,因为 BehaviorSubject 一旦他的所有订阅者取消订阅不会自动取消订阅。

类似这样的:

BehaviorSubject subject = new BehaviorSubject();
hotObservable.subscribe(subject);
subject.subscribe(thing -> {
    // Here just after subscribing 
    // you will receive the last emitted item, if there was any.
    // You can also always supply the first item to the behavior subject
});

http://reactivex.io/RxJava/javadoc/rx/subjects/BehaviorSubject.html

【讨论】:

    【解决方案2】:

    在 RxJava 中,subscriber.onXXX 被称为异步。这意味着如果您的 Observable 在新线程中发出项目,则您永远无法获得返回之前的最后一个项目,除非您阻塞线程并等待该项目。但是如果 Observable同步发射项目,您不会通过 subscribeOn 和 observOn 更改它的线程, 如代码:

    Observable.just(1,2,3).subscribe();
    

    在这种情况下,您可以通过以下方式获取最后一项:

    Integer getLast(Observable<Integer> o){
        final int[] ret = new int[1];
        Observable.last().subscribe(i -> ret[0] = i);
        return ret[0];
    }
    

    这样做是个坏主意。RxJava 更喜欢你用它来做异步工作。

    【讨论】:

      【解决方案3】:

      您在这里真正想要实现的是获取一个异步任务并将其转换为同步任务。

      有几种方法可以实现,每一种都各有利弊:

      • 使用toBlocking() - 这意味着该线程将被阻塞,直到流完成,为了只获得一个项目,只需使用first(),因为一旦交付了一个项目,它就会完成。 假设您的整个流是Observable&lt;T&gt; getData(); 那么立即获取最后一个值的方法将如下所示:

      public T getLastItem(){ return getData().toBlocking().first(); }

      请不要使用last(),因为它会等待流完成,然后才会发出最后一项。

      如果您的流是网络请求并且它还没有获得任何项目,这将阻塞您的线程!所以只有在您确定有立即可用的项目时才使用它(或者如果您真的想要一个阻塞。 ..)

      • 另一种选择是简单地缓存最后一个结果,如下所示:

        getData().subscribe(t-> cachedT = t;) //在代码中的某个地方,它将继续保存最后交付的项目 公共T getLastItem(){ 返回缓存的T; }

      如果在您请求时没有发送任何项目,您将获得 null 或您设置的任何初始值。 这种方法的问题是订阅阶段可能发生在获取之后,如果在 2 个不同的线程中使用,可能会产生竞争条件。

      【讨论】:

      • 我无法按照此处的代码进行操作; getLastItem应用于什么类,getData()函数的细节是什么?
      • GetData 只是为了强调您正在使用的流。您可以将其替换为您正在使用的可观察对象,例如原始示例中的 myObservable Observable myObservable;然后 myObservable.toBlocking().first() 到达后会给你第一个项目。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2020-07-26
      • 1970-01-01
      • 1970-01-01
      • 2021-11-25
      • 1970-01-01
      • 2023-04-02
      • 1970-01-01
      相关资源
      最近更新 更多