【问题标题】:How to use/control RxJava Observable.cache如何使用/控制 RxJava Observable.cache
【发布时间】:2020-06-27 08:01:05
【问题描述】:

我正在尝试使用 RxJava 缓存机制 (RxJava2),但我似乎无法理解它是如何工作的,或者我如何控制缓存的内容,因为有 cache 运算符。

我想在发出新数据之前用一些条件验证缓存的数据。

例如

someObservable.
repeat().
filter { it.age < maxAge }.
map(it.name).
cache() 

我如何检查和过滤缓存值,如果成功则发出它,否则我将请求一个新值。

由于值会定期更改,因此我需要先验证缓存是否仍然有效,然后才能请求新缓存。

还有ObservableCache&lt;T&gt; 类,但我找不到任何使用它的资源。

任何帮助将不胜感激。谢谢。

【问题讨论】:

  • 数据源是一个还是多个?是什么导致数据发生变化,多久发生一次?这也可能有助于获得一些想法:stackoverflow.com/questions/31733455/…
  • 感谢您的评论。目前我只想用一个源实现缓存机制数据每隔几秒就会无限变化
  • Observable 有多少订阅者?我假设会有不止一个,但只是想确定一下。
  • 我正在使用一个主题在 observables 之间共享数据
  • 有什么更新吗?你能回答你的问题吗?

标签: reactive-programming rx-java2 rx-android


【解决方案1】:

这不是重放/缓存的工作方式。请先阅读#replay/#cache 文档。

重播

这个操作符返回一个 ConnectableObservable,它有一些方法 (#refCount/ #connect/ #autoConnect) 用于连接到源。

当#replay 被应用而没有过载时,源订阅被多播并且所有发出的值sind 连接将被重播。源订阅是惰性的,可以通过#refCount/#connect/#autoConnect连接源。

返回一个 ConnectableObservable,它共享对底层 ObservableSource 的单一订阅,该订阅将重播其所有项目和通知给任何未来的 Observer。

在没有任何连接方法的情况下应用#relay (#refCount/ #connect/ #autoConnect) 不会在订阅时发出任何值

Connectable ObservableSource 类似于普通的 ObservableSource,不同之处在于它不会在订阅时开始发射项目,而只是在调用其 connect 方法时。

重播(1)#autoConnect(-1) / #refCount(1) / #connect

应用 replay(1) 将缓存最后一个值,并将在每个订阅上发出缓存的值。 #autoConnect 将立即打开连接并保持打开状态,直到发生终端事件(onComplete、onError)。 #refCount 是 smiular,但会在所有订阅者消失时断开与源的连接。 #connect 运算符可以在您需要等待时使用,当所有对 observable 的订阅都已完成时,以免丢失值。

用法

#replay(1) -- 大部分应该用在 observable 的末尾。

sourcObs.
  .filter()
  .map()
  .replay(bufferSize)
  .refCount(connectWhenXSubsciberSubscribed) 

注意

在没有缓冲区限制或到期日期的情况下应用#replay 将导致内存泄漏,当您观察到无限时

缓存/cacheWithInitialCapacity

运算符类似于带有 autoConnect(1) 的 #replay。运算符将缓存每个值并在每个订阅上重播。

仅当第一个下游订阅者订阅并维护对该 ObservableSource 的单个订阅时,操作员才订阅。相反,返回 ConnectableObservable 的操作符系列 replay() 需要显式调用 ConnectableObservable.connect()。 注意:当您使用缓存 Observer 时,您牺牲了处理源的能力,因此请注意不要在 ObservableSources 上使用此 Observer,因为该 ObservableSources 会发出无限或非常多的将耗尽内存的项目。一种可能的解决方法是在应用 cache() 之前(也可能在之后)将 takeUntil 与谓词或其他源一起应用。

示例

    @Test
    fun skfdsfkds() {
        val create = PublishSubject.create<Int>()

        val cacheWithInitialCapacity = create
            .cacheWithInitialCapacity(1)

        cacheWithInitialCapacity.subscribe()

        create.onNext(1)
        create.onNext(2)
        create.onNext(3)

        cacheWithInitialCapacity.test().assertValues(1, 2, 3)
        cacheWithInitialCapacity.test().assertValues(1, 2, 3)
    }

用法

使用缓存操作符,当你无法控制连接阶段时

当您希望 ObservableSource 缓存响应并且您无法控制所有观察者的订阅/处置行为时,这很有用。

注意

与 replay() 一样,缓存是无限的,可能会导致内存泄漏。

注意:容量提示不是缓存大小的上限。为此,请考虑将 replay(int) 与 ConnectableObservable.autoConnect() 或类似方法结合使用。

进一步阅读

https://blog.danlew.net/2018/09/25/connectable-observables-so-hot-right-now/

https://blog.danlew.net/2016/06/13/multicasting-in-rxjava/

【讨论】:

  • 是的,你是对的,因为我误解了缓存是如何工作的,因为它在弹珠图上的核心使用中显示,并且连接到多个订阅者。 reactivex.io/RxJava/2.x/javadoc/io/reactivex/…
  • 对多区域部署的全局缓存有什么想法吗?
【解决方案2】:

如果您的事件源 (Observable) 是昂贵的操作,例如从数据库中读取,则不应使用 Subject 来观察事件,因为这将为每个订阅者重复昂贵的操作。由于“OutOfMemory”异常,无限流缓存也可能存在风险。更合适的解决方案可能是ConnectableObservable,它只执行一次源操作,并将更新后的值广播给所有订阅者。

这是一个代码示例。我没有费心创建无限周期流或包含错误处理以保持示例简单。让我知道它是否满足您的需求。

class RxJavaTest {

    private final int maxValue = 50;

    private final ConnectableObservable<Integer> source =
            Observable.<Integer>create(
                subscriber -> {
                    log("Starting Event Source");
                    subscriber.onNext(readFromDatabase());
                    subscriber.onNext(readFromDatabase());
                    subscriber.onNext(readFromDatabase());
                    subscriber.onComplete();
                    log("Event Source Terminated");
                })
                .subscribeOn(Schedulers.io())
                .filter(value -> value < maxValue)
                .publish();

    void run() throws InterruptedException {
        log("Starting Application");

        log("Subscribing");
        source.subscribe(value -> log("Subscriber 1: " + value));
        source.subscribe(value -> log("Subscriber 2: " + value));

        log("Connecting");
        source.connect();

        // Add sleep to give event source enough time to complete
        log("Application Terminated");
        sleep(4000);
    }

    private Integer readFromDatabase() throws InterruptedException {
        // Emulate long database read time
        log("Reading data from database...");
        sleep(1000);

        int randomValue = new Random().nextInt(2 * maxValue) + 1;
        log(String.format("Read value: %d", randomValue));
        return randomValue;
    }

    private static void log(Object message) {
        System.out.println(
                Thread.currentThread().getName() + " >> " + message
        );
    }
}

这是输出:

main >> Starting Application
main >> Subscribing
main >> Connecting
main >> Application Terminated
RxCachedThreadScheduler-1 >> Starting Event Source
RxCachedThreadScheduler-1 >> Reading data from database...
RxCachedThreadScheduler-1 >> Read value: 88
RxCachedThreadScheduler-1 >> Reading data from database...
RxCachedThreadScheduler-1 >> Read value: 42
RxCachedThreadScheduler-1 >> Subscriber 1: 42
RxCachedThreadScheduler-1 >> Subscriber 2: 42
RxCachedThreadScheduler-1 >> Reading data from database...
RxCachedThreadScheduler-1 >> Read value: 37
RxCachedThreadScheduler-1 >> Subscriber 1: 37
RxCachedThreadScheduler-1 >> Subscriber 2: 37
RxCachedThreadScheduler-1 >> Event Source Terminated.

注意以下几点:

  • 事件仅在源上调用 connect() 时才开始触发,而不是在观察者订阅源时触发。
  • 每次事件更新仅调用一次数据库
  • 过滤后的值不会发送给订阅者
  • 所有订阅者都在同一个线程中执行
  • 由于并发,应用程序在事件处理之前终止。通常,您的应用将在事件循环中运行,因此您的应用将在缓慢运行期间保持响应。

【讨论】:

  • 对多区域部署的全局缓存有什么想法吗?
  • @nikita91000 我通常使用 RxJava 来同步同一应用程序/进程中的线程。听起来您可能正在寻找cloud-based solution
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-07-22
  • 2017-01-05
  • 2015-09-06
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多