重新编辑的答案
RxJS 当前版本的相关来源在这里:
repeat 在这里定义:
https://github.com/ReactiveX/rxjs/blob/master/src/internal/operators/repeat.ts
export function repeat<T>(count: number = -1): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) => {
if (count === 0) {
return empty();
} else if (count < 0) {
return source.lift(new RepeatOperator(-1, source));
} else {
return source.lift(new RepeatOperator(count - 1, source));
}
};
}
class RepeatOperator<T> implements Operator<T, T> {
constructor(private count: number,
private source: Observable<T>) {
}
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source.subscribe(new RepeatSubscriber(subscriber, this.count, this.source));
}
}
class RepeatSubscriber<T> extends Subscriber<T> {
constructor(destination: Subscriber<any>,
private count: number,
private source: Observable<T>) {
super(destination);
}
complete() {
if (!this.isStopped) {
const { source, count } = this;
if (count === 0) {
return super.complete();
} else if (count > -1) {
this.count = count - 1;
}
source.subscribe(this._unsubscribeAndRecycle());
}
}
}
这里定义了lift 函数:
https://github.com/ReactiveX/rxjs/blob/master/src/internal/Observable.ts
lift<R>(operator: Operator<T, R>): Observable<R> {
const observable = new Observable<R>();
observable.source = this;
observable.operator = operator;
return observable;
}
结论(根据我的理解,请不要犹豫评论/纠正我)
repeat(n) 只是递归地创建同一个 observable 的副本;它只是在前一个副本完成时重新订阅下一个副本。当最后一个事件完成时,它会发出一个“完成”事件。
原答案
简单回答您的基本问题:
它怎么知道 Observable 的开始?
无需深入细节,因为它“包装”/“作用于”所有先前的 Observable,可以这么说。
实际上,每个操作员都这样做! (作用于他们身后的整个链条)
不要忘记,当你链接/管道操作符时,你会得到一个新的 Observable,它对应于从头开始的整个操作。
Observable 上的运算符返回一个新的 Observable,因此任何新应用的运算符都将应用于这个新的“更大”的 Observable
Observable1.map(...) 返回一个新的 Observable2 对应整个流。
Observable2.repeat() 重复整个过程。
编辑
我不是很精通,如果我错了,请提前原谅我,但我想实现的核心在这里:
https://github.com/Reactive-Extensions/RxJS/blob/master/src/core/linq/observable/repeatproto.js
这是:
observableProto.repeat = function (repeatCount) {
return enumerableRepeat(this, repeatCount).concat();
};
这似乎意味着它基本上是concat n 倍于相同的可观察对象。
repeat(3) 将是 concat 自身的 2 倍相同的 observable。
let repeated$ = Observable.from([1,2,3]).map(x => x*2).repeat(3);
等价于
let myObs$ = Observable.from([1,2,3]).map(x => x*2);
let repeated$ = myObs.concat(myObs).concat(myObs);