【问题标题】:How is RxJS operator "repeat" implemented?RxJS 操作符“repeat”是如何实现的?
【发布时间】:2018-09-08 09:46:17
【问题描述】:

我正在阅读来自herehere 的RxJS 代码,但它取决于库的许多其他部分,我无法提取其实现背后的想法。我很好奇它在内部是如何工作的。它怎么能从头开始重复Observable?例如,这里:

Rx.Observable.of(2, 4, 5, 8, 10)
    .map(num => {
        if(num % 2 !== 0) {
            throw new Error('Odd number');
        }
        return num;
    })
    //Several other operators later...
    .retry(3)
    .subscribe(
        num => console.log(num),
        err => console.log(err.message)
    );

它怎么知道Observable的开头?

【问题讨论】:

    标签: javascript typescript rxjs reactive-programming


    【解决方案1】:

    重新编辑的答案

    RxJS 当前版本的相关来源在这里:

    repeat 在这里定义:

    https://github.com/ReactiveX/rxjs/blob/master/src/internal/operators/repeat.ts

    export function repeat<T>(count: number = -1): MonoTypeOperatorFunction<T> {
      return (source: Observable<T>) => {
        if (count === 0) {
          return empty();
        } else if (count < 0) {
          return source.lift(new RepeatOperator(-1, source));
        } else {
          return source.lift(new RepeatOperator(count - 1, source));
        }
      };
    }
    
    class RepeatOperator<T> implements Operator<T, T> {
      constructor(private count: number,
                  private source: Observable<T>) {
      }
      call(subscriber: Subscriber<T>, source: any): TeardownLogic {
        return source.subscribe(new RepeatSubscriber(subscriber, this.count, this.source));
      }
    }
    
    class RepeatSubscriber<T> extends Subscriber<T> {
      constructor(destination: Subscriber<any>,
                  private count: number,
                  private source: Observable<T>) {
        super(destination);
      }
      complete() {
        if (!this.isStopped) {
          const { source, count } = this;
          if (count === 0) {
            return super.complete();
          } else if (count > -1) {
            this.count = count - 1;
          }
          source.subscribe(this._unsubscribeAndRecycle());
        }
      }
    }
    

    这里定义了lift 函数:

    https://github.com/ReactiveX/rxjs/blob/master/src/internal/Observable.ts

      lift<R>(operator: Operator<T, R>): Observable<R> {
        const observable = new Observable<R>();
        observable.source = this;
        observable.operator = operator;
        return observable;
      }
    

    结论(根据我的理解,请不要犹豫评论/纠正我)

    repeat(n) 只是递归地创建同一个 observable 的副本;它只是在前一个副本完成时重新订阅下一个副本。当最后一个事件完成时,它会发出一个“完成”事件。


    原答案

    简单回答您的基本问题:

    它怎么知道 Observable 的开始?

    无需深入细节,因为它“包装”/“作用于”所有先前的 Observable,可以这么说。

    实际上,每个操作员都这样做! (作用于他们身后的整个链条)

    不要忘记,当你链接/管道操作符时,你会得到一个新的 Observable,它对应于从头开始的整个操作。

    Observable 上的运算符返回一个新的 Observable,因此任何新应用的运算符都将应用于这个新的“更大”的 Observable


    Observable1.map(...) 返回一个新的 Observable2 对应整个流。

    Observable2.repeat() 重复整个过程。


    编辑

    我不是很精通,如果我错了,请提前原谅我,但我想实现的核心在这里:

    https://github.com/Reactive-Extensions/RxJS/blob/master/src/core/linq/observable/repeatproto.js

    这是:

    observableProto.repeat = function (repeatCount) {
      return enumerableRepeat(this, repeatCount).concat();
    };
    

    这似乎意味着它基本上是concat n 倍于相同的可观察对象。

    repeat(3) 将是 concat 自身的 2 倍相同的 observable。

    let repeated$ = Observable.from([1,2,3]).map(x => x*2).repeat(3);
    

    等价于

    let myObs$ = Observable.from([1,2,3]).map(x => x*2);
    let repeated$ = myObs.concat(myObs).concat(myObs);
    

    【讨论】:

    • 你能用一些代码展示你的想法吗?查看this关于Observable实现的帖子,特别是map的实现。我认为这是 RxJS 运算符的一般工作方式,它们返回的 Observable 并不适用于整个序列(它是一个新序列)。但这种方法不适用于repeat
    • @amedina 我已经更新了我的答案,并提供了指向源代码中可能更相关部分的链接(虽然不完全确定)。但对我来说,repeat(n) 基本上是 concat n-1 次,具有相同的 observable。
    • 另外,你和我在这里链接的 repo 似乎不是当前的。见github.com/ReactiveX/rxjs/tree/master/src
    • 是的,我知道我在链接错误的仓库时犯了一个错误。我读了你的回答,现在我明白了它是如何工作的。谢谢;)
    猜你喜欢
    • 2016-05-10
    • 2021-11-25
    • 1970-01-01
    • 2021-04-21
    • 1970-01-01
    • 2016-11-06
    • 2021-12-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多