【问题标题】:Observable caches emitted items or does not?Observable 是否缓存已发出的项目?
【发布时间】:2016-05-16 14:23:31
【问题描述】:

Observable 是否缓存发射的项目?我有两个测试导致我得出不同的结论:

从测试 #1 我得出的结论是:

测试 #1:

Observable<Long> clock = Observable
    .interval(1000, TimeUnit.MILLISECONDS)
    .take(10)
    .map(i -> i++);

//subscribefor the first time
clock.subscribe(i -> System.out.println("a: " + i)); 

//subscribe with 2.5 seconds delay
Executors.newScheduledThreadPool(1).schedule(
    () -> clock.subscribe(i -> System.out.println("  b: " + i)),
    2500,
    TimeUnit.MILLISECONDS
);

输出 #1:

a: 0
a: 1
a: 2
  b: 0
a: 3
  b: 1

但是第二个测试表明我们得到了两个观察者的不同值:

测试 #2:

Observable<Integer> observable  = Observable
                .range(1, 1000000)
                .sample(7, TimeUnit.MILLISECONDS);
observable.subscribe(i -> System.out.println("Subscriber #1:" + i));
observable.subscribe(i -> System.out.println("Subscriber #2:" + i)); 

输出 #2:

Subscriber #1:72745
Subscriber #1:196390
Subscriber #1:678171
Subscriber #2:336533
Subscriber #2:735521

【问题讨论】:

    标签: java rx-java reactive-programming


    【解决方案1】:

    存在两种 Observable:热和冷。 Cold observables 倾向于为其 Observers 生成相同的序列,除非你有外部效果,例如基于计时器的操作,与之相关联。

    在第一个示例中,您会获得两次相同的序列,因为除了计时器滴答声之外没有任何外部效果,您会一次次获得。在第二个示例中,您对快速源进行采样,并且随时间采样具有不确定性:每一纳秒都很重要,因此即使是最轻微的不精确也会导致报告的值不同。

    【讨论】:

    • 但是 rx lib 如何知道代码是否有外部影响?由于热的 observables 可以随时转换为冷的,我的结论是冷的和热的 observables 都必须记住/缓存每个发射的项目,对吗?看起来有点无效
    • 你不知道,通常它甚至不相关。您可以选择使用 cache()、replay() 或 ReplaySubject 来记住序列。
    • 是的,这就是我感到困惑的原因。我可以调用 .cache() 但 rx 无论如何都会缓存整个项目列表。至少我从测试#1 中得出了这个结论
    • @zeroflagL 你知道如何“打破”测试#1 第二个订阅不会得到0 项目吗?我需要更大的超时时间吗?
    • @VolodymyrBakhmatiuk 在示例 #1 中,每个订阅者都有自己的序列,但所有序列都生成相同的值。如果你使用像i -&gt; Math.random()这样的映射函数,那么你会看到订阅者得到不同的值并且没有缓存。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-11-17
    • 1970-01-01
    相关资源
    最近更新 更多