这是一种方法:
const URL = 'https://jsonplaceholder.typicode.com/todos/1';
const notifier = new Subject();
const pending = new BehaviorSubject(false);
const cacheEmpty = Symbol('cache empty')
const shared$ = notifier.pipe(
withLatestFrom(pending),
filter(([_, isPending]) => isPending === false),
switchMap(() => (
console.warn('[FETCHING DATA]'),
pending.next(true),
fetch(URL).then(r => r.json())
)),
tap(() => pending.next(false)),
shareReplay(1, 1000),
);
const src$ = shared$.pipe(
mergeWith(of(cacheEmpty).pipe(delay(0), takeUntil(shared$))),
tap(v => v === cacheEmpty && notifier.next()),
filter(v => v !== cacheEmpty)
)
src$.subscribe(v => console.log('[1]', v));
setTimeout(() => {
src$.subscribe(v => console.log('[2]', v));
}, 500);
setTimeout(() => {
src$.subscribe(v => console.log('[3]', v));
}, 1200);
StackBlitz.
mergeWith 是import { merge as mergeWith } from 'rxjs/operators'(我认为从 RxJs 7 开始,它可以直接作为mergeWith 访问)。
我的理由是我需要找到一种方法来确定正在使用的ReplaySubject 的缓存是否为空。众所周知,如果缓存不为空并且有新订阅者到达,它将同步发送缓存值。
所以,
mergeWith(of(cacheEmpty).pipe(delay(0), takeUntil(shared$))),
本质上是一样的
merge(
shared$,
of(cacheEmpty).pipe(delay(0), takeUntil(shared$)) // #2
)
如果缓存中有值,shared$ 将发出,#2 将被取消订阅。
如果没有值,#2 将发出然后完成(它完成的事实不会影响外部 observable)。
接下来,我们看到如果cacheEmpty已经发出,那么我们就知道是时候刷新数据了。
tap(v => v === cacheEmpty && notifier.next()), // `notifier.next()` -> time to refresh
filter(v => v !== cacheEmpty)
现在,让我们看看notifier 的工作原理
const shared$ = notifier.pipe(
// These 2 operators + `pending` make sure that if 2 subscribers register one after the other, thus synchronously
// the source won't be subscribed more than needed
withLatestFrom(pending),
filter(([_, isPending]) => isPending === false),
switchMap(() => (
console.warn('[FETCHING DATA]'),
pending.next(true), // If a new subscriber registers while the request is pending, the source won't be requested twice
fetch(URL).then(r => r.json())
)),
// The request has finished, we have the new data
tap(() => pending.next(false)),
shareReplay(1, 1000),
);