【问题标题】:How can I update an Observable without losing active subscribers?如何在不丢失活跃订阅者的情况下更新 Observable?
【发布时间】:2020-10-15 16:15:22
【问题描述】:

我目前在尝试使用 publishReplayrefCount 更新缓存值时遇到了一些麻烦。

首先,有问题的方法

getCurrentUser(): Observable<User> {
    if (!this.jwtService.getToken() || this.jwtService.getAnonymousToken()) { return of(null); }
    if (!this.currentUser$) {
      console.warn('user.service.getcurrentuser -> load and CACHE it once');
      this.currentUser$ = this.userService.loadCurrentUser().pipe(
        // https://medium.com/angular-in-depth/fastest-way-to-cache-for-lazy-developers-angular-with-rxjs-444a198ed6a6
        publishReplay(1),
        refCount()
      );
    }
    console.log('user.service.getcurrentuser -> get from CACHE');
    return this.currentUser$;
 }

this.currentUser$User 类型的 Observable。

该方法本身按预期工作,该方法的第一次调用从服务器加载当前用户。随后的调用将获取缓存的条目。 然而,我现在要做的是删除缓存的值并从服务器重新加载用户(即在前端编辑了某些内容并且需要重新加载用户数据)。

我做到了

removeCachedUser(): void {
  this.currentUser$ = null;
}

通过这样做,getCurrentUser 方法中的if 条件被触发,用户从服务器重新加载。到目前为止,一切顺利。

我面临的问题是在缓存重置之前订阅getCurrentUser 的方法没有收到任何更新,即

ngOnInit() {
  this.currentUser$ = this.authService.getCurrentUser();
}

我怀疑这是因为我覆盖了this.currentUser$ Observable。

this.currentUser$ = this.userService.loadCurrentUser()

This post 或多或少地证实了我的理论。

我说的对吗?如果是这样,处理这个问题的正确方法是什么?不幸的是,我没有反应式后端,所以我必须手动触发重新加载。提前致谢!

【问题讨论】:

    标签: angular typescript rxjs


    【解决方案1】:

    您必须创建一个可以多次发出的 Observable 才能向活跃的订阅者发送新值。在您当前的设置中,this.currentUser$this.userService.loadCurrentUser() 完成时完成。我怀疑loadCurrentUser 是一个发出一次并完成的http 请求。所以this.currentUser$ 不能多次发射。

    要创建一个可以多次发射的 Observable,请使用 Subject 作为源并映射到您的 http 请求。要让用户在调用reloadUser() 之前第一次订阅currentUser$Subject 应该是BehaviorSubject

    private _reloadUser = new BehaviorSubject<void>(0 as void);
    
    currentUser$ = this._reloadUser.pipe(
      switchMap(() => {
        if (!this.jwtService.getToken() || this.jwtService.getAnonymousToken()) { 
          return of(null); 
        }
        console.warn('user.service.getcurrentuser -> load and CACHE it');
        return this.userService.loadCurrentUser();
      }),
      shareReplay(1),
      tap(user => console.log('user from CACHE:', user))
    ) 
    
    reloadUser() {
      this._reloadUser.next();
    }
    

    使用switchMap 调用reloadUser() 将取消任何正在进行的加载请求并发送新的加载请求。如果当前正在加载用户,如果对 reloadUser() 的调用不应该重新加载用户,您也可以使用 exhaustMap 而不是 switchMap

    shareReplay(1) 在这种情况下比publishReplay(1), refCount() 更适合,因为它默认没有refCount 功能。对于publishReplay(1), refCount(),如果没有任何订阅者,currentUser$ 的新订阅者将触发加载请求。在这种情况下,shareReplay(1) 将改为从“缓存”返回当前用户。

    【讨论】:

    • 一点点:switchMap 在这里似乎比排气映射更合适,至少在当前用户可以更改为其他用户的情况下,因为排气映射如果发生得太快可能会“错过”。
    【解决方案2】:

    你是对的,this.currentUser$ = this.userService.loadCurrentUser() 行覆盖了属性

    解决方案

    像下面这样创建currentUser$ Subject

    currentUserSubject$ = new BehaviourSubject<User | null>(null);
    currentUser$ = this.currentUserSubject$.asObservable();
    
    

    现在我们将您的代码重构为

    getCurrentUser(): Observable<User> {
      if (!this.jwtService.getToken() || this.jwtService.getAnonymousToken()) { 
        return of(null); 
      }
        if (!this.currentUser$.value) {
          console.warn('user.service.getcurrentuser -> load and CACHE it once');
          return this.userService.loadCurrentUser().pipe(
            tap(user => this.currentUserSubject$.next(user)),
            // https://medium.com/angular-in-depth/fastest-way-to-cache-for-lazy-developers-angular-with-rxjs-444a198ed6a6
            publishReplay(1),
            refCount()
          );
        }
        console.log('user.service.getcurrentuser -> get from CACHE');
        return this.currentUser$;
     }
    

    清除缓存功能可以改为

    removeCachedUser(): void {
      this.currentUserSubject$.next(null);
    }
    

    注意tap(user =&gt; this.currentUserSubject$.next(user)),这一行,我们通过在Subject上调用next()函数来设置我们的主题值

    【讨论】:

      【解决方案3】:

      您也可以使用shareReplay 结合takeUntil 删除缓存。

      https://stackoverflow.com/questions/47926240/when-should-i-use-publishreplay-vs-sharereplay/47926453#:~:text=publishReplay%20allows%20you%20to%20controls,to%20worry%20about%20unsubscribing%20etc.

      private _destroyCache$ = new Subject<void>();
      
      getCurrentUser(): Observable<User> {
          if (!this.jwtService.getToken() || this.jwtService.getAnonymousToken()) { return of(null); }
          if (!this.currentUser$) {
              console.warn('user.service.getcurrentuser -> load and CACHE it once');
              this.currentUser$ = this.userService.loadCurrentUser().pipe(
                  takeUntil(this._destroyCache$),
                  shareReplay(1),
              );
          }
          console.log('user.service.getcurrentuser -> get from CACHE');
          return this.currentUser$;
      }
      
      removeCachedUser(): void {
          this.currentUser$ = null;
          this._destroyCache$.next();
          this._destroyCache$.compleate();
          this._destroyCache$ = new Subject<void>();
      }
      

      在组件上你可以有这样的东西:

      ngOnInit() {
          loadData();
      }
      
      loadData() {
          this.currentUser$ = this.authService.getCurrentUser();
      }
      
      refreshData() {
          this.authService.removeCachedUser();
          loadData();
      }
      

      【讨论】:

        猜你喜欢
        • 2020-11-24
        • 1970-01-01
        • 2018-09-08
        • 1970-01-01
        • 1970-01-01
        • 2020-10-17
        • 2017-03-03
        • 2020-04-19
        • 1970-01-01
        相关资源
        最近更新 更多