【问题标题】:RxJs: take action on first and last subscriber to observableRxJs:对 observable 的第一个和最后一个订阅者采取行动
【发布时间】:2017-01-14 06:07:46
【问题描述】:

在我的 Angular 2.x 应用程序中,我有一个订阅服务公开的可观察对象的组件。可观察数据服务目前是通过轮询 HTTP 请求实现的,但这将更改为响应式 WS 实现。我想保持对组件的轮询,所以问题是:一旦我有订阅者,我该如何采取行动,比如通过setInterval 进行 HTTP 轮询?而且,当没有更多订阅者时,我该如何采取行动,例如 clearInterval

视图是异步更新的:

  <div *ngFor="let headline of headlines$ | async">
    <md-list-item (click)="onSelect(headline)">
      <h4 md-line>{{headline.title}}</h4>
    </md-list-item>
    <md-divider></md-divider>
  </div>

组件从服务中获取可观察的标题:

export class HeadlinesComponent implements OnInit, OnDestroy {
  constructor(
      private _headlineService: HeadlineService,
  ) {
  }

  headlines$: Observable<Headline[]>;

  ngOnInit() {
    this.headlines$ = this._headlineService.headlines$;
  }
}

并且该服务被实现为可观察的数据存储:

@Injectable()
export class HeadlineService {
  constructor(
    private _http: Http) {
    _init();
  }

  private _headlines$: BehaviorSubject<Headline[]> = 
    new BehaviorSubject<Headline[]>([]);

  get headlines$(): Observable<Headline[]> {
    return this._headlines$.asObservable();
  }

  private _store: {
    headlines: Headline[]
  } = { headlines: [] };

  private _save(headlines: Headline[]) {
    this._store.headlines = headlines;
    this._headlines$.next(Object.assign({}, this._store).headlines);
  }
}

商店通过 HTTP 更新:

  private _interval;

  private _init() {
    let self = this;
    this._interval = setInterval(function(){ self._loadHeadlines();}, 5000);
  }

  private _loadHeadlines(): Observable<Headline[]> {
    let self = this;
    let observable: Observable<Headline[]> = this._http
      .get(url, options)
      .map(response => response.json()._embedded.headlines)
      .share();

    observable.subscribe(
      headlines => self._save(headlines)
      );    

    return observable;
  }

实施后,标题服务开始轮询构建,而不是延迟到订阅者出现。为了使生命周期正确,我可以公开服务的_loadHeadlines 功能并在触发组件的ngOnInit 时开始轮询(并在ngOnDestroy 中停止)但这会将服务实现细节暴露给组件(这些细节将在我转向网络套接字实现),我需要准确计算订阅者的数量。

那么,有没有很好的 RxJs 方式来解决这个问题?

【问题讨论】:

    标签: angular rxjs5


    【解决方案1】:

    Observables 是惰性的,只有在订阅时才开始工作。当您停止收听时,他们会取消订阅并进行清理。

    您正在寻找的是多播行为,在第一次订阅时引用 init 并且只有在没有人再订阅清理时才进行。这是使用.share() 运算符完成的。

    【讨论】:

      【解决方案2】:

      我没有在 Angular 中使用 RxJS,但从您的示例代码看来,您可以 .create 一个可观察的对象,它开始轮询间隔并返回将在取消订阅时调用的清除间隔函数。

      /*
          Increment value every 1s, emit even numbers.
      */
      const evenNumbers = Rx.Observable.create(function(observer) {
          let value = 0;
          const interval = setInterval(() => {
              if(value % 2 === 0){
                  observer.next(value);
              }
              value++;
          }, 1000);
      
          return () => clearInterval(interval);
      });
      //output: 0...2...4...6...8
      const subscribe = evenNumbers.subscribe(val => console.log(val));
      //unsubscribe after 10 seconds
      setTimeout(() => {
          subscribe.unsubscribe();
      }, 10000);
      

      上面的例子来自learnrxjs.io

      JSBin Example

      【讨论】:

      • 感谢@subhaze 的注释和指向 learn-rxjs 的指针。在这种情况下,时间间隔从第一个订阅者开始,到最后一个订阅者结束。我可以为最后一个订阅者使用完成处理程序,但是知道如何在第一个订阅者上触发操作吗?
      • 我不确定我是否完全遵循,但这里是一个合并一个 observable 的例子,它只在第一次发射时触发 jsbin.com/bewedehoze/1/edit?js,console
      猜你喜欢
      • 1970-01-01
      • 2019-09-15
      • 2020-06-25
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多