【问题标题】:RxJS cache and refresh with shareReplayRxJS 使用 shareReplay 缓存和刷新
【发布时间】:2020-06-29 07:33:39
【问题描述】:

我正在对从 API 检索的一些数据使用缓存,出于逻辑原因,存储的数据仅在有限的时间内有效,所以我正在使用类似的东西:

someApiData$ = this.getData()
    .pipe(shareReplay(1, 3000))

对我来说似乎很明显但对shareReplay 运算符的创建者来说显然不是的是,如果数据不再被缓存,则应该重新获取它,或者至少我应该有另一个参数会给我这个选项,比如:

someApiData$ = this.getData()
    .pipe(shareReplay(1, 3000, shouldRefresh))

相反,下一个订阅者将获得 null。 所以,我正在寻找一个优雅的解决方案来解决这个问题。

【问题讨论】:

    标签: angular rxjs rxjs-pipeable-operators


    【解决方案1】:

    在对这个线程的答案和网络上的一些其他方法进行了一些跟踪之后,这就是我最终得到的。它提供了以下能力:

    1. 缓存值
    2. 如果数据不再缓存,则自动刷新值
    3. 直接使用Observable
    4. 如果需要,指定缓存生命周期的持续时间
    5. 整理我的服务并提供可重复使用的解决方案

    我的缓存工具:

    export class SharedReplayRefresh {
    
        private sharedReplay$: Observable<T>;
        private subscriptionTime: number;
    
    
       sharedReplayTimerRefresh(
           source: Observable<T>, bufferSize: number = 1,
           windowTime: number = 3000000,  scheduler?: SchedulerLike): Observable<T> {
    
            const currentTime = new Date().getTime();
            if (!this.sharedReplay$ || 
                currentTime - this.subscriptionTime > windowTime) {
                this.sharedReplay$ = source.pipe(shareReplay(
                    bufferSize, windowTime, scheduler));
                this.subscriptionTime = currentTime;
            }
    
            return this.sharedReplay$;
        }
    }
    

    我的数据服务:

    export class DataService {
        
        constructor(private httpClient: HttpClient) { }
    
        private dataSource = 
            new SharedReplayRefresh<Data>();
        private source = this.httpClient.get<Data>(url);
        
        get data$(): Observable<Data> {
            return this.dataSource .sharedReplayTimerRefresh(this.source, 1, 1500);
        }
    }
    

    【讨论】:

      【解决方案2】:

      根据documentationshareReplay运算符的window参数不是这样工作的:

      这个缓冲区中的项目可以被丢弃而不发送给后续观察者的年龄,以毫秒为单位

      在您的代码示例中,这意味着 3 秒后新订阅者将什么也得不到。

      我认为处理这个问题的最好方法是使用外部计数器来处理它:

        private cache$: Observable<any>;
        private lastTime: number;
      
        public getCachedData() {
          if (!this.cache$ || new Date().getTime() - this.lastTime > 3000) {
            this.cache$ = this.getData().pipe(shareReplay(1));
            this.lastTime = new Date().getTime();
          }
      
          return this.cache$;
        }
      

      每次新订阅者调用getCachedData()时,此代码都会“重新创建” Observable。

      但是,旧订阅者不会获得重新创建的新 Observable 的更新。为了使它们保持同步,您可能需要使用BehaviorSubject 来存储数据:

        // Everybody subscribe to this Subject
        private data$ = new BehaviorSubject(null);
      
        public getCachedData() {
          // TODO check time expiration here and call this.refreshData();
          if(timeExpired) {
            return this.refreshData().pipe(
              mergeMap(data => {
                return this.data$.asObservable();
              })
            );
          } else {
            return this.data$.asObservable();
          }
        }
        
        private refreshData() {
          return this.getData().pipe(
            tap(data => {
              this.data$.next(data);
            })
          );
        }
      
      

      上述解决方案只是一个想法,应该改进和测试。

      【讨论】:

      • 我正在考虑做类似的事情,我面临的问题是年长的订阅者会不断收到通知,这是我可能想要避免的。
      【解决方案3】:

      这是一种方法:

      const URL = 'https://jsonplaceholder.typicode.com/todos/1';
      
      const notifier = new Subject();
      const pending = new BehaviorSubject(false);
      
      const cacheEmpty = Symbol('cache empty')
      
      const shared$ = notifier.pipe(
        withLatestFrom(pending),
        filter(([_, isPending]) => isPending === false),
      
        switchMap(() => (
          console.warn('[FETCHING DATA]'),
          pending.next(true),
          fetch(URL).then(r => r.json())
        )),
      
        tap(() => pending.next(false)),
        shareReplay(1, 1000),
      );
      
      const src$ = shared$.pipe(
        mergeWith(of(cacheEmpty).pipe(delay(0), takeUntil(shared$))),
        tap(v => v === cacheEmpty && notifier.next()),
        filter(v => v !== cacheEmpty)
      )
      
      src$.subscribe(v => console.log('[1]', v));
      
      setTimeout(() => {
        src$.subscribe(v => console.log('[2]', v));
      }, 500);
      
      setTimeout(() => {
        src$.subscribe(v => console.log('[3]', v));
      }, 1200);
      

      StackBlitz.

      mergeWithimport { merge as mergeWith } from 'rxjs/operators'(我认为从 RxJs 7 开始,它可以直接作为mergeWith 访问)。

      我的理由是我需要找到一种方法来确定正在使用的ReplaySubject 的缓存是否为空。众所周知,如果缓存不为空并且有新订阅者到达,它将同步发送缓存值。

      所以,

      mergeWith(of(cacheEmpty).pipe(delay(0), takeUntil(shared$))),
      

      本质上是一样的

      merge(
        shared$,
        of(cacheEmpty).pipe(delay(0), takeUntil(shared$)) // #2
      )
      

      如果缓存中有值,shared$ 将发出,#2 将被取消订阅。

      如果没有值,#2 将发出然后完成(它完成的事实不会影响外部 observable)。

      接下来,我们看到如果cacheEmpty已经发出,那么我们就知道是时候刷新数据了。

      tap(v => v === cacheEmpty && notifier.next()), // `notifier.next()` -> time to refresh
      filter(v => v !== cacheEmpty)
      

      现在,让我们看看notifier 的工作原理

      const shared$ = notifier.pipe(
        
        // These 2 operators + `pending` make sure that if 2 subscribers register one after the other, thus synchronously
        // the source won't be subscribed more than needed
        withLatestFrom(pending),
        filter(([_, isPending]) => isPending === false),
      
        switchMap(() => (
          console.warn('[FETCHING DATA]'),
          pending.next(true), // If a new subscriber registers while the request is pending, the source won't be requested twice
          fetch(URL).then(r => r.json())
        )),
      
        // The request has finished, we have the new data
        tap(() => pending.next(false)),
      
        shareReplay(1, 1000),
      );
      

      【讨论】:

        【解决方案4】:

        我有一个类似的用例,最终使用了以下自定义运算符。

        import { Observable } from 'rxjs';
        import { tap } from 'rxjs/operators';
        
        export const cacheValue = <T>(windowTime: (value: T) => number) => (
          source: Observable<T>,
        ) => {
          let cache: { value: T; expires: number } | undefined = undefined;
          return new Observable<T>((observer) => {
            if (cache && cache.expires > Date.now()) {
              observer.next(cache.value);
              observer.complete();
            } else {
              return source
                .pipe(
                  tap(
                    (value) =>
                      (cache = { value, expires: Date.now() + windowTime(value) }),
                  ),
                )
                .subscribe(observer);
            }
          });
        };
        

        如果您的缓存在 100 毫秒后过期,则将其称为 cacheValue(() =&gt; 100),如果 API 返回的值具有 expiresIn 属性,则将其称为 cacheValue((value) =&gt; value.expiresIn)

        【讨论】:

          【解决方案5】:

          您可以更改 Stream 的启动方式,在这种情况下,使用 interval 创建一个立即发出的流,然后在满足间隔时使用它来触发数据加载。

          当您第一次订阅流时,触发间隔,加载数据,然后在三秒后再次。

          import { interval } from 'rxjs';
          
          const interval$ = interval(3000); // emits straight away then every 3 seconds
          

          interval$ 发出时,使用switchMap 切换ObservableshareReplay 以允许多播。

          // previous import
          import { switchMap, shareReplay } from 'rxjs/operators';
          
          // previous code
          
          const data$ = interval$.pipe(
              switchMap(() => getData()),
              shareReplay()
          );
          

          您还可以将 interval$ 包含在 merge 中,这样您就可以根据 Subject 创建手动刷新,就像您的 interval 一样。

          import { BehaviorSubject, merge, interval } from "rxjs";
          import { shareReplay, switchMap } from "rxjs/operators";
          
          const interval$ = interval(3000);
          const reloadCacheSubject = new BehaviorSubject(null);
          
          const data$ = merge(reloadCacheSubject, interval$).pipe(
            switchMap(() => getData()),
            shareReplay()
          );
          
          reloadCacheSubject.next(null); // causes a reload
          

          StackBlitzmergerefreshCache Subject 的示例

          【讨论】:

            【解决方案6】:

            老问题,但我和你有同样的错误。意识到如果“api 数据”为空,我只需要回退 api 调用。新代码以粗体显示。

            someApiData$ = someApiData$ || this.getData().pipe(shareReplay(1, 3000))

            【讨论】:

              猜你喜欢
              • 1970-01-01
              • 2020-11-06
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 2011-09-24
              • 2012-07-02
              • 1970-01-01
              • 1970-01-01
              相关资源
              最近更新 更多