【问题标题】:How can I create a class that extends Rx.Observable in TypeScript?如何在 TypeScript 中创建一个扩展 Rx.Observable 的类?
【发布时间】:2015-08-10 06:16:59
【问题描述】:

我已经使用了最新版本的 rx.js,从“绝对类型”中输入。

当我尝试这个时:

class MyObservable extends Rx.Observable<any> { }

我得到了:A class may only extend another class.

为什么将ObservableSubject 等定义为rx.d.ts 中类的接口实例?

如果我想创建一个扩展 Observable 或 Subject 的类,我该怎么做?

附:我想让这个类处理特定的领域逻辑,所以我需要创建一个新类,而不是直接更新 Observable 的原型。

谢谢!

【问题讨论】:

  • 有许多类型的可观察对象,它们都实现了相同的接口。
  • 是的,但我不能在 typescript 中使用。我认为问题出在rx.d.ts 上。是不是应该自己重写?
  • 对 Typescript 不是很熟悉,但不是implement 与类接口,而不是extend 它们吗?
  • 当我尝试implement Observable 接口时,它告诉我Class 'MyOb' incorrectly implements interface 'Observable&lt;any&gt;'. Property 'forEach' is missing in type 'MyOb'.

标签: typescript rxjs


【解决方案1】:

rx.js 中的底层 Observable 无法扩展,因为它更像 TypeScript module 而不是 class(即它是一个单例)。

 var Observable = Rx.Observable = (function () {
     //...
 })();

这就是为什么它在绝对类型定义中被建模为一个接口,而不是一个类。要实现接口,您必须提供与接口兼容的结构。这是IObservable&lt;T&gt; 的示例。

class MyObservable<T> implements Rx.IObservable<T> {
    subscribe(observer: Rx.Observer<T>): Rx.IDisposable;
    subscribe(onNext?: (value: T) => void, onError?: (exception: any) => void, onCompleted?: () => void): Rx.IDisposable;
    subscribe(a?: Rx.IObserver<T> | Function, onError?: (exception: any) => void, onCompleted?: () => void) {
        return null;
    }

    subscribeOnNext(onNext: (value: T) => void, thisArg?: any): Rx.IDisposable {
        return null;
    }

    subscribeOnError(onError: (exception: any) => void, thisArg?: any): Rx.IDisposable {
        return null;
    }

    subscribeOnCompleted(onCompleted: () => void, thisArg?: any): Rx.IDisposable {
        return null;
    }
}

【讨论】:

  • 不幸的是,这种方法只在理论上可行,史蒂夫。这是因为绝大多数 Rx Operators 是围绕 Rx.Observable 接口(从 Rx.IObservable 派生的)定义的。这意味着您需要实现大量的方法(基本上所有方法)。
【解决方案2】:

我必须为WebRx 解决同样的问题。正如您已经发现的那样,使用 Typescript 类扩展 RxJS 的 IObservable 不是一种选择,因为 Observable 正在作为接口导出。正如我在对史蒂夫芬顿的回答的评论中提到的那样,创建一个实现 Rx.IObservable 的类也不会让你走得太远,因为绝大多数 Rx 运算符都是围绕从 Rx 派生的 Rx.Observable 接口定义的。 IO 可观察的。你最终会重写 Rx.Observable。

在找到更好的方法之前,我解决问题的方法是使用 prototypal inheritance 扩展内置 Rx.Observable 并通过 custom d.ts 文件导出扩展:

RxExtension.ts

var RxObsConstructor = (<any> Rx.Observable);   // this hack is neccessary because the .d.ts for RxJs declares Observable as an interface)

/**
* Creates an read-only observable property with an optional default value from the current (this) observable
* (Note: This is the equivalent to Knockout's ko.computed)
* @param {T} initialValue? Optional initial value, valid until the observable produces a value
*/
RxObsConstructor.prototype.toProperty = function(initialValue?: any, scheduler?: Rx.IScheduler) {
    scheduler = scheduler || Rx.Scheduler.currentThread;

    // initialize accessor function (read-only)
    var accessor: any = (newVal?: any): any => {
        if (arguments.length > 0) {
            internal.throwError("attempt to write to a read-only observable property");
        }

        if (accessor.sub == null) {
            accessor.sub = accessor._source.connect();
        }

        return accessor.value;
    };

    //////////////////////////////////
    // IUnknown implementation

    accessor.queryInterface = (iid: string) => {
        if (iid === IID.IUnknown ||
            iid === IID.IObservableProperty ||
            iid === IID.IDisposable)
            return true;

        return false;
    };

    //////////////////////////////////
    // IDisposable implementation

    accessor.dispose = () => {
        if (accessor.sub) {
            accessor.sub.dispose();
            accessor.sub = null;
        }
    };

    //////////////////////////////////
    // IObservableProperty<T> implementation

    accessor.value = initialValue;

    // setup observables
    accessor.changedSubject = new Rx.Subject<any>();
    accessor.changed = accessor.changedSubject
        .publish()
        .refCount();

    accessor.changingSubject = new Rx.Subject<any>();
    accessor.changing = accessor.changingSubject
        .publish()
        .refCount();

    accessor.source = this;
    accessor.thrownExceptions = internal.createScheduledSubject<Error>(scheduler, app.defaultExceptionHandler);

    //////////////////////////////////
    // implementation

    var firedInitial = false;

    accessor.sub = this
        .distinctUntilChanged()
        .subscribe(x => {
            // Suppress a non-change between initialValue and the first value
            // from a Subscribe
            if (firedInitial && x === accessor.value) {
                return;
            }

            firedInitial = true;

            accessor.changingSubject.onNext(x);
            accessor.value = x;
            accessor.changedSubject.onNext(x);
        }, x=> accessor.thrownExceptions.onNext(x));

    return accessor;
}

RxExtension.d.ts

declare module Rx {
    export interface Observable<T> extends IObservable<T> {
        toProperty(initialValue?: T): wx.IObservableProperty<T>;
    }
}

【讨论】:

  • 谢谢。这可以用作临时解决方案。
猜你喜欢
  • 2016-10-21
  • 2014-07-23
  • 1970-01-01
  • 1970-01-01
  • 2018-06-11
  • 1970-01-01
  • 2015-04-12
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多