【问题标题】:Make a ReplaySubject return only the last value on subscribe让 ReplaySubject 只返回订阅的最后一个值
【发布时间】:2021-05-01 23:34:50
【问题描述】:

我有一个奇怪的用例,我需要跟踪所有以前发出的事件。

感谢 ReplaySubject,到目前为止它运行良好。在每个新订阅者上,此主题都会重新发出之前的所有事件。

现在,对于特定场景,我需要能够只提供最新发布的事件(有点像 BehaviorSubject),但保持源相同的事件。

这是我正在努力实现的目标:stackblitz

import { ReplaySubject, BehaviorSubject, from } from "rxjs";

class EventManager {
  constructor() {
    this.mySubject = new ReplaySubject();
  }

  publish(value) {
    this.mySubject.next(value);
  }

  fullSubscribe(next, error, complete) {
    return this.mySubject.subscribe(next, error, complete);
  }

  subscribe(next, error, complete) {
    return this.mySubject.pipe(/* an operator to get the last one on new subscribe */).subscribe(next, error, complete);
  }
}

const myEventManager = new EventManager();

myEventManager.publish("Data 1");
myEventManager.publish("Data 2");
myEventManager.publish("Data 3");

myEventManager.fullSubscribe(v => {
  console.log("SUB 1", v);
});

myEventManager.subscribe(v => {
  console.log("SUB 2", v);
});

谢谢

【问题讨论】:

  • RxJS#last() 操作符会这样做
  • @MrkSef 不完全正确。 last() 不会按照他们要求的方式工作,它只会在可观察对象完成之前输出最后发布的事件。 OP 要求当订阅 ReplaySubject 时,“热”主题立即发出最后一个发布的事件(如 BehaviorSubject),而不是像默认一样发布给它的所有事件。
  • @PatrickRoberts 好点!好的,使用 RxJS#debounceTime(0)。我认为0 应该在这里工作,因为回放是同步发生的。如果没有,他可以将去抖时间延长一点。
  • 我想到了 debounceTime(0),问题是如果后续发射发生在同一个滴答上,只会收到最后一个。

标签: javascript rxjs replaysubject


【解决方案1】:

如果您跟踪已发布的事件数量,您可以使用skip

  subscribe(next, error?, complete?) {
    return this.mySubject.pipe(
      skip(this.publishCount - 1)
    ).subscribe(next, error, complete);
  }

这是一个StackBlitz 演示。

【讨论】:

    【解决方案2】:

    您可以通过操作BehaviorSubject 来实现类似于ReplaySubject 的行为,而不是强制ReplaySubject 表现得像BehaviorSubject

    import { BehaviorSubject, from, concat } from 'rxjs';
    import { scan, shareReplay } from 'rxjs/operators';
    
    class EventManager {
      constructor() {
        this.mySubject = new BehaviorSubject();
        this.allEmittedValues = this.mySubject.pipe(
          scan((xs, x) => [...xs, x], []),
          shareReplay(1)
        );
    
        // Necessary since we need to start accumulating allEmittedValues
        // immediately.
        this.allEmittedValues.subscribe();
      }
    
      dispose() {
        // ends all subscriptions
        this.mySubject.complete();
      }
    
      publish(value) {
        this.mySubject.next(value);
      }
    
      fullSubscribe(next, error, complete) {
        // First, take the latest value of the accumulated array of emits and
        // unroll it into an observable
        const existingEmits$ = this.allEmittedValues.pipe(
          take(1),
          concatMap((emits) => from(emits))
        );
        // Then, subscribe to the main subject, skipping the replayed value since
        // we just got it at the tail end of existingEmits$
        const futureEmits$ = this.mySubject.pipe(skip(1));
    
        return concat(existingEmits$, futureEmits$).subscribe(
          next,
          error,
          complete
        );
      }
    
      subscribe(next, error, complete) {
        return this.mySubject.subscribe(next, error, complete);
      }
    }
    

    【讨论】:

      【解决方案3】:

      为什么不在EventManager 上有一个ReplaySubjectBehaviorSubject 的实例?

      import { ReplaySubject, BehaviorSubject, from } from "rxjs";
      
      class EventManager {
        constructor() {
          this.replaySubject = new ReplaySubject();
          this.behaviorSubject = new BehaviorSubject();
        }
      
        publish(value) {
          this.replaySubject.next(value);
          this.behaviorSubject.next(value);
        }
      
        fullSubscribe(next, error, complete) {
          return this.replaySubject.subscribe(next, error, complete);
        }
      
        subscribe(next, error, complete) {
          return this.behaviorSubject.subscribe(next, error, complete);
        }
      }
      

      【讨论】:

      • 因为在现实中,它更复杂。我必须处理一系列主题。通过引入另一个 BehaviorSubject 会过多地增加内存负载。我希望有一种“简单”的方式来处理它......
      • @fred.kassi ReplaySubject 肯定会使用大量内存,但相比之下,BehaviorSubject 仅持有对 ReplaySubject 已持有值的额外引用,因此额外的根据BehaviorSubject,内存基本上是恒定的。除非您出于某种原因碰巧发布了非常大的字符串,否则我怀疑您不会注意到内存使用量的显着差异。
      猜你喜欢
      • 2018-09-04
      • 2019-10-26
      • 2019-02-08
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-11-23
      • 1970-01-01
      相关资源
      最近更新 更多