【问题标题】:Wrap an Observable. Do something before and after each emitted value包装一个 Observable。在每个发出的值之前和之后做一些事情
【发布时间】:2018-08-31 14:35:09
【问题描述】:

我想构建一个包装类,它在 Observable 的每个发射值之前和之后执行一些操作。

这是我想出的:

class Wrapper<T> {
    wrapped$: Observable<T>;

    _dataSubject = new Subject<T>();
    data$ = this._dataSubject.pipe(
        tap(_ => console.log("BEFORE"),
        //
        // map( ??? )
        //
    );

    constructor(wrapped$: Observable<T>) {
        this.wrapped$ = wrapped$.pipe(
            tap(_ => console.log("AFTER")
        );
    }
}

let subject = new Subject<string>();
let wrapper = new Wrapper(subject);
wrapper.data$.subscribe(val => console.log(val));
subject.next("foo")

控制台输出应该是:

BEFORE
foo
AFTER

我不知道如何将$wrapped Observable 与_dataSubject 连接起来。

但也许我完全错了,它需要一种不同的方法。

【问题讨论】:

  • 您的代码有些奇怪,因为您声明了两个主题。您永远不会将 _dataSubject 中创建的那个与您当前的实现一起使用。因此,当您调用下一个主题时,什么都不会发生。你想实现什么用例?
  • 我知道我的代码根本不起作用,否则我不会在这里问问题:D 我只是想展示我想如何使用 Wrapper 类。虽然唯一真正重要的行是最后 4 行(let subject = ...subject.next("foo");

标签: typescript rxjs observable rxjs5 subject-observer


【解决方案1】:

感谢您的所有意见!

我现在想出了自己的解决方案。 AFTERDATA 输出的顺序错误,但我发现这并不重要,只要它们同时出现即可

代码仍然需要一些重构,但至少现在可以工作!

class Wrapper {
    _taskDoneSubject = new Subject<boolean>();
    taskDone$ = this._taskDoneSubject.asObservable();

    wrappedSwitchMap<T, R>(wrapped: (val: T) => Observable<R>): OperatorFunction<T, R> {
        return switchMap<T, R>(val => { // can also be done using mergeMap
            this._taskDoneSubject.next(false);
            return wrapped(val).pipe(
                tap(_ => this._taskDoneSubject.next(true))
            );
        });
    }
}

用法:

test(valueEmittingObservable: Observable<string>) {

    let wrapper = new Wrapper();
    wrapper.taskDone$.subscribe(val => console.log("task done? ", val);

    valueEmittingObservable.pipe(
        // wrapper.wrappedSwitchMap(val => anObservableThatTakesAWhile(val))
        wrapper.wrappedSwitchMap(val => of("foo").pipe(delay(5000)) // simulated
    ).subscribe(val => console.log("emitted value: ", val);

}

输出:

task done? false
(... 5 seconds delay ...)
task done? true
emitted value: foo

或者如果它的发射速度超过 5 秒:

task done? false
(... 1 second until next emit ...)
task done? false
(... 5 seconds delay ...)
task done? true
emitted value: foo

【讨论】:

    【解决方案2】:

    最好的方法(虽然很复杂)是创建一个新的运算符,类似于tap,但它会在发出值之前和之后做一些事情。

    您可以在示例中看到它的工作原理(它在 ES6 中,因为 SO 代码 sn-ps 不接受 TypeScript,但您会明白的)

    function wrap(before, after) {
        return function wrapOperatorFunction(source) {
            return source.lift(new WrapOperator(before, after));
        };
    }
    class WrapOperator {
        constructor(before, after) {
            this.before = before;
            this.after = after;
        }
        call(subscriber, source) {
            return source.subscribe(new WrapSubscriber(subscriber, this.before, this.after));
        }
    }
    
    class WrapSubscriber extends Rx.Subscriber {
        constructor(destination, before, after) {
            super(destination);
            this.before = before;
            this.after = after;
        }
        _next(value) {
            this.before ? this.before(value) : null;
            this.destination.next(value);
            this.after ? this.after(value) : null;
        }
    }
    
    // Now:
    
    const observable = Rx.Observable.from([1, 2, 3, 4]);
    
    observable.pipe(
       wrap(value => console.log('before', value), value => console.log('after', value))
    ).subscribe(value => console.log('value emitted', value), null, () => console.log('complete'));
    
    
    // For what you want:
    // let's simulate that, for each value in the array, we'll fetch something from an external service:
    // we want the before to be executed when we make the request, and the after to be executed when it finishes. In this // case, we just combine two wrap operators and flatMap, for example:
    
    observable.pipe(
      wrap(value => console.log('BEFORE REQUEST', value)),
      Rx.operators.flatMap(value => {
        const subject = new Rx.Subject();
        setTimeout(() => { subject.next(value); subject.complete(); }, 5000);
        return subject;
      }),
      wrap(undefined, value => console.log('AFTER REQUEST', value))
    ).subscribe(value => console.log('value emitted', value), null, () => console.log('complete'));
    &lt;script src="https://unpkg.com/@reactivex/rxjs@5.5.0/dist/global/Rx.js"&gt;&lt;/script&gt;

    如前所述,可能有点复杂,但它与 RxJS 运算符无缝集成,它始终是了解如何创建我们自己的运算符的好例子 :-)

    对于您在评论中所说的内容,您可以查看最后一个示例。在那里,我结合了两个 wrap 运算符。第一个仅使用before 回调,因此它仅在发出值之前执行某些操作。如您所见,因为源 observable 来自一个数组,所以四个 before 回调会立即执行。然后,我们申请flatMap。为此,我们应用了一个新的wrap,但这次只使用了after 回调。所以,这个回调只有在 flatMap 返回的 observables 产生它们的值之后才会被调用。

    当然,如果取而代之的是一个来自数组的 observable,你会从一个事件监听器中得到一个,你会有:

    1. before 回调在触发的事件被 observable 推送之前执行。
    2. 异步 observable 执行。
    3. 异步 observable 在时间 t 后产生值。
    4. after 回调被执行。

    这是让运营商获得回报的地方,因为它们很容易组合。希望这适合你。

    【讨论】:

    • 谢谢。与其他答案相比,看起来真的很复杂。但效果很好。
    • 唯一的缺点是before 将等待被执行,直到被包装的 Observable 发出一个值。虽然如果包装后的 observable 需要 5 秒,那么 before 将在 5 秒后 执行。如果它能够被包装的 observable 发出一个值之前被执行,那就太好了。
    • 不知道我是否理解。 before 在值发出之前执行,after 在之后执行。如果源 observable 被限制,它仍然会这样工作。不知道你到底是什么意思
    • 很难解释。我会尝试使用Subject。当您调用subject.next(...) 时,应该立即调用before 方法。现在,pipe 中可能有一个 HTTP 调用(或其他一些长时间运行的函数),例如使用 switchMap。当这个长时间运行的函数完成后,应该调用这个函数的输出和after 函数。结果将如下所示:1. BEFORE2.(延迟几秒钟,因为有一些长时间运行的函数),3. 长时间运行函数的结果 + AFTER
    • 知道了。检查我的修改。
    【解决方案3】:

    所以,据我了解,你可以这样做:

    class Wrapper<T> {
    
        _dataSubject: Subject<T>;
        wrapped$: Observable<T>;
    
        constructor(wrapped$: Subject<T>) {
            this._dataSubject = wrapped$;
            this.wrapped$ = this._dataSubject.pipe(
              tap(_ => console.log("BEFORE")),
              tap(data => console.log(data)),
              tap(_ => console.log("AFTER"))
            );
        }
    }
    

    然后:

    let subject = new Subject<string>();
    let wrapper = new Wrapper(subject);
    wrapper.wrapped$.subscribe();
    subject.next("foo")
    

    【讨论】:

    • 是的,这与 Picci 所做的基本相同。有效!
    • 为什么要撤消我的编辑?在pipe 中没有可以使用的asObservable 方法。另外pipe 已经返回了Observable 类型(没有Subject)。
    • 你是对的,它不在管道中,但它是一种方法。这是为了防止做wrapper.wrapped$.next('hello world'); 并且经过测试,它现在不再是一个主题所以你是对的^^
    • 我知道你想要实现什么,但仍然:pipe 返回一个Observable(即使你调用subject.pipe())。 pipe 函数是 Observable 类 ( github.com/ReactiveX/rxjs/blob/master/src/internal/… ) 的一部分,不是 Subject 类 ( github.com/ReactiveX/rxjs/blob/master/src/internal/Subject.ts ) 的一部分。虽然你的代码在使用 TypeScript 时会抛出错误。
    • 是的,你是对的,我刚刚在我当前的项目中测试了它。当我写我的答案时,我忘记了关于管道函数返回值的这一部分。我应该在发布答案时测试代码而不是做出假设:)
    【解决方案4】:

    这样的事情呢

    import {Observable} from 'rxjs';
    
    export class DoBeforeAfter<T> {
        wrapped$: Observable<T>;
    
    
        constructor(wrapped$: Observable<T>, doFunction: (data: any) => void) {
            this.wrapped$ = Observable.of(null)
                .do(_ => console.log("BEFORE"))
                .switchMap(_ => wrapped$)
                .do(doFunction)
                .do(_ => console.log('AFTER'));
        }
    
    }
    

    这样吃

    const source = Observable.of('NOW');
    const beforeAfter = new DoBeforeAfter(source, data => console.log(data));
    
    beforeAfter.wrapped$.subscribe(
            null,
            error => console.error(error),
            () => console.log('DONE')
    

    )

    看起来有点麻烦,但也许能帮上忙

    【讨论】:

    • 你是个天才。这真的很容易。谢谢你。有时,只见树木不见森林是不可能的。 :D 唯一的悲剧是Observable.of(null)。看起来不是那么干净,但至少可以按预期工作。
    • 我知道深入树林的感觉
    猜你喜欢
    • 2022-01-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-11-12
    • 2021-09-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多