【问题标题】:RxJava operator like replay(1, 1, MINUTE) but which resubscribes after 1 minute passedRxJava 运算符,如 replay(1, 1, MINUTE),但在 1 分钟后重新订阅
【发布时间】:2019-09-26 18:30:24
【问题描述】:

我需要一种相对简单的过期缓存机制(比如 1 分钟)。

  1. 当第一个订阅者订阅时,我想进行 API 调用。
  2. 当第二个在一分钟内订阅时,不想进行 API 调用,而是向下游推送之前加载的值。
  3. 当另一个人在第一个订阅一分钟后订阅时,我希望再次进行 API 调用。

现在,我想在单个 rxjava 链中执行此操作。 .replay(1, 1, MINUTE) 看起来很完美,直到我了解到,一分钟过去后,源 observable 不会重新订阅。我再也没有从那个可观察到的东西中得到任何东西。我可能需要将replay()repeatWhen{} 合并但找不到的东西。我尝试了非常奇特的组合,但没有一个适用于我的测试用例。

【问题讨论】:

  • 您是否尝试过将.interval(1, MINUTE).replay(1, 1, MINUTE) 结合使用? interval() 每分钟调用一次 API。
  • 问题是我不想每分钟都打这个电话。我只想在需要时拨打电话,但不超过一分钟。

标签: android kotlin rx-java reactive-programming rx-java2


【解决方案1】:

这可能不是最好的解决方案,但我会尝试这样做:

    public final class SimpleCacheSingle<T : Any> constructor(
     val apiRequest: (value: String, callback: (T) -> Unit) -> Unit
 ) {
     private var lastTimeSeconds = 0L
     private lateinit var cachedValue: T

     fun getSingle(): Single<T> = Single.create { emitter ->
         if (System.currentTimeMillis() / 1000 - lastTimeSeconds > 60) {
             apiRequest("example argument") {
                 cachedValue = it
                 lastTimeSeconds = System.currentTimeMillis() / 1000
                 emitter.onSuccess(cachedValue)
             }
         } else {
             emitter.onSuccess(cachedValue)
         }
     }
 }

只需创建它的一个实例,然后使用 getSingle() 为每个订阅者创建一个。

当然附加代码sn-p中的“apiRequest”,需要修改以满足你的需要。

编辑: 请注意,当您在之前的 api 调用完成之前订阅时,您将有两个或多个待处理的 API 请求,而不是一个。 您必须修改代码,以便一次只有一个请求。

【讨论】:

  • 感谢@Dominik Setniewski,这当然可以,但我想要一个优雅的单链解决方案。不过,可能没有简单的方法来做到这一点。
【解决方案2】:

我认为这段代码(这里的时间间隔是 5 秒而不是 1 分钟。测试更容易)应该可以解决问题:

DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss").withZone(ZoneId.systemDefault());
AtomicInteger subscriberCounter = new AtomicInteger(0);
PublishProcessor<String> repeater = PublishProcessor.create();

Flowable<String> flowable =
    Flowable.defer(() -> Flowable.just(apiCall()))
            .repeatWhen(handler -> repeater
                                     .skip(5, SECONDS) // don't trigger a repeat for any new subscription between 0 and 5 seconds
                                     .throttleFirst(5, SECONDS) // trigger a repeat and ignore any new notification during 5 sec
            )
            .replay(1)
            .autoConnect()
            .doOnSubscribe(s -> {
                System.out.println(formatter.format(now())
                                    + " -----> new subscription of subscriber #"
                                    + subscriberCounter.incrementAndGet());
                repeater.onNext("whatever"); // notify a new subscription to the repeat handler
            });

flowable.subscribe(s ->System.out.println("subscriber #1 receives: " + s));
Thread.sleep(3000);
flowable.subscribe(s -> System.out.println("subscriber #2 receives: " + s));
Thread.sleep(4000);
flowable.subscribe(s -> System.out.println("subscriber #3 receives: " + s));
Thread.sleep(100);
flowable.subscribe(s -> System.out.println("subscriber #4 receives: " + s));
Thread.sleep(6000);
flowable.subscribe(s -> System.out.println("subscriber #5 receives: " + s));
Thread.sleep(1000);
flowable.subscribe(s -> System.out.println("subscriber #6 receives: " + s));
Thread.sleep(6000);
flowable.subscribe(s -> System.out.println("subscriber #7 receives: " + s));

Flowable.timer(60, SECONDS) // Just to block the main thread for a while
        .blockingSubscribe();

这给出了:

16:54:54 -----> new subscription of subscriber #1
subscriber #1 receives: API call #0
16:54:57 -----> new subscription of subscriber #2
subscriber #2 receives: API call #0
16:55:01 -----> new subscription of subscriber #3
subscriber #1 receives: API call #1
subscriber #2 receives: API call #1
subscriber #3 receives: API call #1
16:55:01 -----> new subscription of subscriber #4
subscriber #4 receives: API call #1
16:55:07 -----> new subscription of subscriber #5
subscriber #1 receives: API call #2
subscriber #2 receives: API call #2
subscriber #3 receives: API call #2
subscriber #4 receives: API call #2
subscriber #5 receives: API call #2
16:55:08 -----> new subscription of subscriber #6
subscriber #6 receives: API call #2
16:55:14 -----> new subscription of subscriber #7
subscriber #1 receives: API call #3
subscriber #2 receives: API call #3
subscriber #3 receives: API call #3
subscriber #4 receives: API call #3
subscriber #5 receives: API call #3
subscriber #6 receives: API call #3
subscriber #7 receives: API call #3

也许我们可以做得更好,但我现在没有其他想法。

【讨论】:

    【解决方案3】:

    您可以使用以下运算符

    1. .retryWhen() 运算符重新订阅。
    2. .delay() 操作符来延迟订阅者:Delay 操作符通过在发出每个源 Observable 的项目之前暂停特定的时间增量(您指定)来修改其源 Observable。这具有将 Observable 发出的整个项目序列向前移动指定增量的效果。

    【讨论】:

    • 出错后重试重订。延迟只会让我更多地等待结果。它们不提供任何缓存功能,除非我错过了什么。
    猜你喜欢
    • 2015-09-03
    • 1970-01-01
    • 1970-01-01
    • 2013-04-02
    • 2014-10-29
    • 2020-11-27
    • 1970-01-01
    • 1970-01-01
    • 2018-06-20
    相关资源
    最近更新 更多