【问题标题】:rxjs5/Angular - Clear ReplaySubject bufferrxjs5/Angular - 清除 ReplaySubject 缓冲区
【发布时间】:2017-06-11 02:09:15
【问题描述】:

我有一个 Angular 服务,它异步地向多个组件共享数据流(http 调用)。我需要不时根据用户操作调用 http 服务。

我正在使用 ReplaySubject 来保存加载的值并发送给在 http 调用之后订阅的订阅者。

我想知道是否有办法在进行后续 http 调用之前清除 ReplaySubject 的缓冲区? 在此期间,我怀疑我还需要取消订阅才能不造成泄漏?

服务:

@Injectable()
export class GreatDataService {

    public data$: ReplaySubject<any>;
    private subs: Subscription;

    constructor(private http: Http) {
        this.data$ = new ReplaySubject(1);
    }

    public refresh() {
        if (this.subs) {
            this.subs.unsubscribe();
            this.subs = null;
        }
        this.subs = this.http.get('/api').subscribe(this.data$)
    }
}

顶级部分组件:

...
    constructor(private greatDataService: GreatDataService) {}
    ngOnInit() {
         this.greatDataService.refresh();
    }
...

组件 1:

...
    constructor(private greatDataService: GreatDataService) {}
    ngOnInit() {
    this.greatDataService.data$.subscribe(
        x => console.log('subscriber 1: ' + x),
        err => console.log('subscriber 1: ' + err),
        () => console.log('subscriber 1: Completed')
    );
...

组件 2:

...
    constructor(private greatDataService: GreatDataService) {}
    ngOnInit() {
    this.greatDataService.data$.subscribe(
        x => console.log('subscriber 2: ' + x),
        err => console.log('subscriber 2: ' + err),
        () => console.log('subscriber 2: Completed')
    );
...

【问题讨论】:

    标签: angular angular2-services rxjs5


    【解决方案1】:

    您可以使用 Rx.Subject 发出新的“get-fresh-data”事件,以便在使用 .switchMap() 调用 refresh() 时检索新数据。请参阅此示例如何执行此操作:

    function getData() {
      return Rx.Observable.of('retrieving new data')
        .timestamp()
        .delay(500);
    }
    
    // in this example i use an eventStream of clicks
    // you can use Rx.Subject() and manually .next() a new value
    // when somebody invokes .refresh()
    const refreshDataClickStream = Rx.Observable.fromEvent(document.getElementById('refresh_stream'), 'click');
    
    const dataStream = refreshDataClickStream
      .startWith('PAGE_LOAD') /* let the stream always first time fetching data */
      .switchMap(() => getData()) /* getData() is not cached so we switchMap to a new instance, abandoning the previous result*/
      .publishReplay().refCount(); /* refCounter so everybody gets the same results */
    
    dataStream.subscribe(data => console.log('sub1 data: ' + JSON.stringify(data)));
    
    setTimeout(() => {
      console.log('late arriving subscription (gets same stream)');
      dataStream.subscribe(data => console.log('sub2 data: ' + JSON.stringify(data)));
    }, 2000);
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>
    <input type='button' id='refresh_stream' value="refresh_stream" />

    【讨论】:

    • 这不是 Angular 的答案!!
    【解决方案2】:

    如果您可以利用缓冲区使用来自原始源的数据这一事实,并且缓冲数据的订阅者可以在接收到所有旧值后切换到原始源,则问题会变得更容易。

    例如。

    let data$ = new Subject<any>() // Data source
    
    let buffer$ = new ReplaySubject<any>() 
    let bs = data$.subscribe(buffer$)  // Buffer subscribes to data
    
    let getRepeater = () => {
       return concat(buffer$.pipe(
          takeUntil(data$), // Switch from buffer to original source when data comes in
        ), data$)
    }
    

    要清除,请更换缓冲区

    // Begin Buffer Clear Sequence
    bs.unsubscribe()
    buffer$.complete()
    
    buffer$ = new ReplaySubject()
    bs = data$.subscribe(buffer$)
    buffObs.next(buffer$)
    

    为了使代码更实用,您可以将函数 getRepeater() 替换为反映最新参考的主题

    let buffObs = new ReplaySubject<ReplaySubject<any>>(1)
    buffObs.next(buffer$)        
    
    let repeater$ = concat(buffObs.pipe(
       takeUntil(data$),
       switchMap((e) => e),                    
    ), data$)
    

    以下

        let data$ = new Subject<any>()
    
        let buffer$ = new ReplaySubject<any>()
        let bs = data$.subscribe(buffer$)         
    
        let buffObs = new ReplaySubject<ReplaySubject<any>>(1)
        buffObs.next(buffer$)        
    
        let repeater$ = concat(buffObs.pipe(
          takeUntil(data$),
          switchMap((e) => e),                    
        ), data$)
    
        // Begin Test
    
        data$.next(1)
        data$.next(2)
        data$.next(3)
    
        console.log('rep1 sub')
        let r1 = repeater$.subscribe((e) => {          
          console.log('rep1 ' + e)
        })
    
        // Begin Buffer Clear Sequence
        bs.unsubscribe()
        buffer$.complete()
    
        buffer$ = new ReplaySubject()
        bs = data$.subscribe(buffer$)
        buffObs.next(buffer$)
        // End Buffer Clear Sequence
    
        console.log('rep2 sub')
        let r2 = repeater$.subscribe((e) => {
          console.log('rep2 ' + e)
        })
    
        data$.next(4)
        data$.next(5)
        data$.next(6)
    
        r1.unsubscribe()
        r2.unsubscribe()
    
        data$.next(7)
        data$.next(8)
        data$.next(9)        
    
        console.log('rep3 sub')
        let r3 = repeater$.subscribe((e) => {
          console.log('rep3 ' + e)
        })
    

    输出

    rep1 子

    rep1 1

    rep1 2

    rep1 3

    rep2 子

    rep1 4

    rep2 4

    rep1 5

    rep2 5

    rep1 6

    rep2 6

    rep3 子

    rep3 4

    rep3 5

    rep3 6

    rep3 7

    rep3 8

    rep3 9

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2017-01-26
      • 2012-10-12
      • 1970-01-01
      • 1970-01-01
      • 2016-08-18
      • 2020-05-16
      • 2011-04-15
      相关资源
      最近更新 更多