【问题标题】:`combineLatest`, `switchMap` and retaining inner subscriptions`combineLatest`、`switchMap` 和保留内部订阅
【发布时间】:2018-08-16 11:05:27
【问题描述】:

我有一个Observable<Array<Observable<T>>>,我想将其映射到Observable<Array<T>>

当一个新数组被发出时,内部的 observables 应该按如下方式取消订阅/订阅:

  • 如果Observable 存在于先前数组和新/当前数组中,保留预先存在的订阅
  • 如果Observable 不存在于先前的数组中但存在于新的/当前数组中,创建新订阅
  • 如果 Observable 存在于先前的数组中但不存在于新的/当前数组中,取消订阅预先存在的订阅

我希望在外部 observable 上使用switchMap 来实现这一点,然后将Array<Observable<T>> 传递给combineLatest。但是,switchMap 将在订阅新的内部Observable 之前取消其先前的内部Observable 的订阅,这意味着内部订阅不会按需要保留。

示例 (https://stackblitz.com/edit/typescript-b4wgr1)。给定代码:

import 'rxjs/Rx';
import { Observable } from 'rxjs';

const debugObservable = <T>(t$: Observable<T>, name: string) =>
    new Observable<T>(observer => {
        console.log(name, 'subscribe');
        const subscription = t$.subscribe(observer);
        return () => {
            console.log(name, 'unsubscribe');
            return subscription.unsubscribe();
        };
    });

const ofSingle = <T>(t: T) =>
    new Observable<T>(observer => {
        observer.next(t);
    });

const observableOfArrayOfObservablesOfNumber = new Observable<
    Array<Observable<number>>
>(observe => {
    const keep = debugObservable(ofSingle(1), 'keep');
    const remove = debugObservable(ofSingle(2), 'remove');
    const add = debugObservable(ofSingle(3), 'add');

    observe.next([keep, remove]);

    setTimeout(() => {
        observe.next([keep, add]);
    }, 2000);

    return () => {};
});

// The `switchMap` will unsubscribe to the previous inner observable *before* subscribing to the new
// inner observable.
const final$ = observableOfArrayOfObservablesOfNumber.switchMap(
    arrayOfObservablesOfNumber => {
        const observableOfArrayOfNumbers = Observable.combineLatest(
            arrayOfObservablesOfNumber,
        );
        return debugObservable(
            observableOfArrayOfNumbers,
            'observableOfArrayOfNumbers',
        );
    },
);

final$.subscribe(x => console.log('final', x));

这会产生:

observableOfArrayOfNumbers subscribe
keep subscribe
remove subscribe
final [1, 2]
keep unsubscribe <--- bad!
remove unsubscribe
observableOfArrayOfNumbers unsubscribe
observableOfArrayOfNumbers subscribe
keep subscribe <--- bad!
add subscribe
final [1, 3]

然而,这正是我想要的:

observableOfArrayOfNumbers subscribe
keep subscribe
remove subscribe
final [1, 2]
remove unsubscribe
observableOfArrayOfNumbers unsubscribe
observableOfArrayOfNumbers subscribe
add subscribe
final [1, 3]

【问题讨论】:

  • 您可以使用自定义运算符来执行此操作 - 类似于 combineLatest 的工作,但会像您描述的那样交换可观察值和发出的值。您可以使用自定义运算符还是坚持将现有运算符组合起来?
  • 自定义没问题!

标签: rxjs observable


【解决方案1】:

我最终通过使用 publishReplay(1) 发布 + 重播内部 observables 然后引用计数来实现这一点。

注意refCount 是不够的,因为当switchMap 取消订阅前一个内部可观察对象(在它订阅新的内部可观察对象之前)时,计数将下降到0,所以我不得不使用特殊的@987654329 @ 运算符仅在延迟后通过引用计数取消订阅(即在事件循环的同一滴答声内但不同步)。更多信息在这里:

https://stackblitz.com/edit/typescript-4xfwsh?file=index.ts

const createObservable = <T>(t: T, name: string) => {
  return refCountWithDelay(debugObservable(ofSingle(t), name).publishReplay(1), 0, 0);
}

const observableOfArrayOfObservablesOfNumber = new Observable<
    Array<Observable<number>>
>(observe => {
    const keep = createObservable(1, 'keep');
    const remove = createObservable(2, 'remove');
    const add = createObservable(3, 'add');

    observe.next([keep, remove]);

    setTimeout(() => {
        observe.next([keep, add]);
    }, 2000);

    return () => {};
});

生产:

observableOfArrayOfNumbers subscribe
keep subscribe
remove subscribe
final [1, 2]
observableOfArrayOfNumbers unsubscribe
observableOfArrayOfNumbers subscribe
remove unsubscribe
add subscribe
final [1, 3]

注意keep 只订阅一次。

【讨论】:

    【解决方案2】:

    我想出了一个更好的解决方案,使用来自rxjs-etccombineLatestHigherOrderhttps://github.com/cartant/rxjs-etc

    https://stackblitz.com/edit/typescript-hfze6m?file=index.ts

    【讨论】:

      【解决方案3】:

      与您描述的最接近的是 Cycle.js Onionify 中名为 pickCombine 的 xstream 运算符。

      似乎没有一个官方的 RxJS 操作符可以解决这个问题,但是可以构建自己的操作符来实现这种行为。你可以参考pickCombine的xstream实现。

      关键部分是:

      请注意,创建custom data structure(它使用 Map 并依赖键来消除数组项的歧义)比直接在数组上创建更容易且更有效。您可以对外部 API 隐藏自定义数据结构。

      【讨论】:

      • 这看起来很有希望,虽然我有点坚持将它作为一个 RxJS 操作符来实现......
      猜你喜欢
      • 2021-03-13
      • 1970-01-01
      • 2019-03-28
      • 2020-01-23
      • 2023-03-24
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-03-25
      相关资源
      最近更新 更多