【问题标题】:In RxJS, does Observer get injected into Observable execution?在 RxJS 中,Observer 是否被注入到 Observable 执行中?
【发布时间】:2019-01-30 17:21:59
【问题描述】:

我已多次阅读ReactiveX 文档,但仍然无法完全理解当Observer 订阅Observable 时会发生什么。

我们来看一个简单的例子:

import { Observable } from 'rxjs'; 

const observable = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.complete();
});

const observer = {
  next: (x) => console.log('got value ' + x),
  error: (err) => console.error('something wrong occurred: ' + err),
  complete: () => console.log('done')
};

observable.subscribe(observer);

StackBlitz code.


我的问题:

传递给Observablesubscriber对象从何而来?

来自RxJS documentation

observable.subscribesubscribenew Observable(function subscribe(subscriber) {...}) 具有相同的名称。 在图书馆中,它们是不同的,但出于实际目的,您可以 认为它们在概念上是相等的。

所以,显然传递到 Observable 构造函数 (subscriber) 中的 subscribe 回调的对象不是实际上是 observer目的。至少如果你引用上面关于库实际工作方式的引用,则不会。

如果传入的不是observer对象,那么subscriber.next(1)subscribe.complete()到底调用了什么?这如何连接到observer 中的next 属性?


澄清编辑:

我知道如何使用 RxJS 并且确实可以在概念上 想象 Observer 被注入(正如引文所说)。不过,我想了解它实际上是如何工作的。

【问题讨论】:

  • @ABOS 检查报价,这不是它在库中的工作方式。有点相关,这也不是 Promises 在内部构建的方式。传递给Promise 构造函数(执行器函数)中的回调的rejectresolve 函数实际上并非来自then(onResolve, onReject)。它们来自内部 Promise 实现,当被调用时,它们以某种方式执行then 中的处理程序(如果附加)。 developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/…
  • @ABOS 我的问题是它实际上是如何工作的。总的来说,我相信,通过深入了解某件事的工作原理,你会更好地应用它。向 OP 添加澄清编辑。
  • 将订阅者视为观察者的包装器,确保nextcompleteerrror 都存在,并且观察者回调抛出的任何错误都会被捕获并正确路由。订阅者本质上是一个实现细节:github.com/ReactiveX/rxjs/issues/2314
  • @cartant 谢谢。不订阅,是不是就好像我们的观察者被直接注入到了 Observable 构造函数中的回调中一样?
  • subscribe 方法的调用调用传递给Observable 构造函数的函数。该函数在内部存储为_subscribe,并使用传递给subscribe 方法here 的包装观察者调用 - 请注意包装观察者的toSubscriber 调用(就在其上方)。

标签: javascript rxjs reactivex


【解决方案1】:

Observable的创建流程如下:

Observable 由作者定义(此处手动使用new,为了便于说明):

const myObservable = new Observable(function subscribe(subscriber) {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.complete();
  return function tearDownLogic() {
    console.log('runs when Observable for whatever reason is done (complete, error, or unsubscribed)')
  }
});

上面传递给Observablesubscribe回调被Observable constructor保存在本地:

constructor(subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic) {
  if (subscribe) {
    this._subscribe = subscribe;
  }
}

所以,我们拥有整个 subscribe 函数,由我们或任何其他预制的 Observable 定义,保存下来供以后执行。

Observer 可以以多种形式之一传递给subscribe 回调。要么直接作为一到三个函数(nexterrorcomplete),要么作为一个对象,具有相同的三个方法中的一个或多个.出于解释的目的,我们将实现最后一个更详细的选项:

const observer = {
  next(v) {
    console.log(v);
  }
  error(err) {
    console.log(err);
  }
  complete() {
    console.log('Observable has now completed and can no longer emit values to observer');
  }
}

现在,有趣的部分开始了。我们将observer 传递给Observable.subscribe(..) 方法:

myObserver.subscribe(observer);

subscribe method 看起来像这样:

  subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void),
            error?: (error: any) => void,
            complete?: () => void): Subscription {


    const { operator } = this;
    const sink = toSubscriber(observerOrNext, error, complete);


    if (operator) {
      sink.add(operator.call(sink, this.source));
    } else {
      sink.add(
        this.source || (config.useDeprecatedSynchronousErrorHandling && !sink.syncErrorThrowable) ?
        this._subscribe(sink) :
        this._trySubscribe(sink)
      );
    }


    if (config.useDeprecatedSynchronousErrorHandling) {
      if (sink.syncErrorThrowable) {
        sink.syncErrorThrowable = false;
        if (sink.syncErrorThrown) {
          throw sink.syncErrorValue;
        }
      }
    }


    return sink;
  }

简述,subscribe 方法:

  1. 以之前讨论的形式之一接收observer
  2. toSubscriber 将观察者转换为 Subscriber 对象,无论其传入形式如何(Subscriber 实例保存在 sink 变量中)
  3. 注意:operator 变量是 undefined,除非您订阅了运算符。因此,只需忽略 operator 周围的 if 语句
  4. Subscriber 扩展(与原型链接)Subscription 对象,该对象在其原型上有两个重要方法:unsubscribe()add()
  5. add(..) 用于将“拆卸逻辑”(函数)添加到Observable,当Observable完成取消订阅时将运行。它将获取传递给它的任何函数,将其包装在Subscription 对象中,并将该函数放入Subscription_unsubscribe 变量中。这个Subscription 保存在我们上面创建的Subscriber 中,在一个名为_subscriptions 的变量中。如前所述,我们这样做是为了当Subscriber取消订阅完成时,所有add()'ed 拆除逻辑执行
  6. 附带说明,Observable.subscribe() 返回Subscriber 实例。因此,您可以随时在其上调用mySubscriber.add( // some tear down logic) 以添加将在Observable 完成取消订阅 时执行的函数
  7. 现在包含一个重要部分:this._trySubscribe(sink) 运行(在 add() 内,作为参数)。 _trySubscribe(..) 是实际运行之前由Observable 构造函数保存的subscribe 回调的函数。重要的是,它传入sink(我们的新Subscriber 实例)作为Observable 回调的回调。换句话说,当Observable中的subscriber.next(1)执行时,我们实际上是在sinkSubscriber)实例中执行next(1)next()Subscriber的原型上)。

所以,这将带我到最后,现在。 toSubscribe 内部以及取消订阅 流程等还有更多细节,但这些超出了本问答的范围。

简而言之,为了回答标题中的问题,观察者确实被传递到Observable,只是在转换为统一的Subscriber对象之后。

希望这将在未来对其他人有所帮助。

【讨论】:

    【解决方案2】:

    不,观察者不会被注入到可观察对象中。

    AFAICT,混淆源于new Observable(...) 语法更像是一个低级工厂而不是一个有用的模式。

    它或多或少是被of(value1, value2, ..., valueN)from(enumeration)fromEvent(...) 等更直接的实现所使用的机制。

    这些方法是您应该关注的实际用例。

    在幕后,所有这些方法将某种同步或异步值或交互连接到可观察流的美妙世界。为此,他们以某种方式一个适当的观察者:他们生成项目并将它们放入流中。为此,他们使用了一个名为next 的函数。就像Observer 实现中的方法一样,因为实际上是以完全相同的方式调用的。

    具体可以看这里的subscribe方法的实现:

    https://github.com/ReactiveX/rxjs/blob/master/src/internal/Observable.ts

    如果您想了解订阅期间实际发生的情况,我建议您实际查看代码。 但是,IMO,您应该在熟悉各种 Observable 创建函数后尝试。

    希望对你有帮助。

    【讨论】:

    • 感谢您的回答。我知道如何使用 RxJS 以及它在非技术意义上是如何工作的。这不是我想在这里得到的。我正在实现低级 Observable 构造函数来演示我的问题,即:这个过程实际上是如何在库中工作的。根据引用(参见 OP 和规范),为了简单起见,我们可以假设它确实是注入的,但正如它所说:那 不是图书馆确实有效。
    • 好吧,那么您应该阅读答案中链接的代码。但要准备好进入一段相当长的旅程。不阅读代码,你无法真正理解代码的实现细节。吞下那颗蓝色药丸!
    • 嘿嘿,你可能是对的。哪个药丸,哪个药丸。我们去兔子洞……:)
    • 现在完成并发布在下面。我浏览了每一行代码,以确保我完全理解它。谢谢指点。
    猜你喜欢
    • 2016-07-06
    • 1970-01-01
    • 2022-08-06
    • 2017-04-01
    • 2021-03-25
    • 1970-01-01
    • 2018-01-09
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多