【发布时间】:2018-08-16 11:05:27
【问题描述】:
我有一个Observable<Array<Observable<T>>>,我想将其映射到Observable<Array<T>>。
当一个新数组被发出时,内部的 observables 应该按如下方式取消订阅/订阅:
- 如果
Observable存在于先前数组和新/当前数组中,保留预先存在的订阅 - 如果
Observable不存在于先前的数组中但存在于新的/当前数组中,创建新订阅 - 如果
Observable存在于先前的数组中但不存在于新的/当前数组中,取消订阅预先存在的订阅
我希望在外部 observable 上使用switchMap 来实现这一点,然后将Array<Observable<T>> 传递给combineLatest。但是,switchMap 将在订阅新的内部Observable 之前取消其先前的内部Observable 的订阅,这意味着内部订阅不会按需要保留。
示例 (https://stackblitz.com/edit/typescript-b4wgr1)。给定代码:
import 'rxjs/Rx';
import { Observable } from 'rxjs';
const debugObservable = <T>(t$: Observable<T>, name: string) =>
new Observable<T>(observer => {
console.log(name, 'subscribe');
const subscription = t$.subscribe(observer);
return () => {
console.log(name, 'unsubscribe');
return subscription.unsubscribe();
};
});
const ofSingle = <T>(t: T) =>
new Observable<T>(observer => {
observer.next(t);
});
const observableOfArrayOfObservablesOfNumber = new Observable<
Array<Observable<number>>
>(observe => {
const keep = debugObservable(ofSingle(1), 'keep');
const remove = debugObservable(ofSingle(2), 'remove');
const add = debugObservable(ofSingle(3), 'add');
observe.next([keep, remove]);
setTimeout(() => {
observe.next([keep, add]);
}, 2000);
return () => {};
});
// The `switchMap` will unsubscribe to the previous inner observable *before* subscribing to the new
// inner observable.
const final$ = observableOfArrayOfObservablesOfNumber.switchMap(
arrayOfObservablesOfNumber => {
const observableOfArrayOfNumbers = Observable.combineLatest(
arrayOfObservablesOfNumber,
);
return debugObservable(
observableOfArrayOfNumbers,
'observableOfArrayOfNumbers',
);
},
);
final$.subscribe(x => console.log('final', x));
这会产生:
observableOfArrayOfNumbers subscribe
keep subscribe
remove subscribe
final [1, 2]
keep unsubscribe <--- bad!
remove unsubscribe
observableOfArrayOfNumbers unsubscribe
observableOfArrayOfNumbers subscribe
keep subscribe <--- bad!
add subscribe
final [1, 3]
然而,这正是我想要的:
observableOfArrayOfNumbers subscribe
keep subscribe
remove subscribe
final [1, 2]
remove unsubscribe
observableOfArrayOfNumbers unsubscribe
observableOfArrayOfNumbers subscribe
add subscribe
final [1, 3]
【问题讨论】:
-
您可以使用自定义运算符来执行此操作 - 类似于
combineLatest的工作,但会像您描述的那样交换可观察值和发出的值。您可以使用自定义运算符还是坚持将现有运算符组合起来? -
自定义没问题!
标签: rxjs observable