【问题标题】:rxjs and websocket - Do I need a heartbeat?rxjs 和 websocket - 我需要心跳吗?
【发布时间】:2021-11-20 13:31:56
【问题描述】:

我对 ReactiveX 和处理套接字事件还很陌生。我不断偶然发现的一件事是,在处理 websocket 时需要保持活动的心跳功能。

我在网上搜索时发现保持连接的原因可能包括以下原因:

  • 我们需要能够检测到服务器的连接是否中断
  • 我们需要保持连接打开,这可以通过心跳来完成。 (在我手动关闭连接之前,它不是一直打开吗?)

目前,我正在用rxjs实现这个,我即将实现一个心跳,但是我开始怀疑这是否真的有必要?

在查看 rxjs 库时,关闭事件可以通过 rxjs 套接字主题提供的观察者非常容易地处理,即:

interface WebSocketSubjectConfig<T> {
    url: string
    protocol?: string | Array<string>
    resultSelector?: (e: MessageEvent) => T
    serializer?: (value: T) => WebSocketMessage
    deserializer?: (e: MessageEvent) => T
    openObserver?: NextObserver<Event>
    closeObserver?: NextObserver<CloseEvent>
    closingObserver?: NextObserver<void>
    WebSocketCtor?: {...}
    binaryType?: 'blob' | 'arraybuffer'
}

因此,如果我遇到连接问题,我可以轻松地为 closeObserver 创建一个 Observable,它可以侦听任何关闭事件并随后触发重新连接循环。

我什至用我的本地 API 对此进行了测试,我可以看到每次在我的 API 中模拟“错误”时都会触发该事件。

因此我的问题是:

  • 我是否遗漏了什么,rxjs 是否已经为我处理了这个问题,并为我提供了 closeObserver 作为回报?
  • 如果我已有的似乎没问题,为什么还需要实现乒乓模式? (有没有我没有研究过的角落?)

【问题讨论】:

    标签: javascript websocket rxjs


    【解决方案1】:

    在我手动关闭连接之前,它不是一直打开吗?

    是的,没错。除非您手动关闭它,或者在此过程中出现错误,否则连接将保持打开状态。不需要心跳机制。

    我们需要能够检测到服务器的连接是否中断 向下

    这实际上是一个好问题,因为它可能不会立即显而易见。那么我们来看看WebSocketSubject的实际代码:

      /**
       * An Observer then watches when close events occur on the underlying webSocket
       */
      closeObserver?: NextObserver<CloseEvent>;
    

    所以当关闭事件发生时,总是会调用这个。然后是他们自己的例子:

     * **closeObserver** allows us to set a custom error when an error raise up.
     * import { webSocket } from 'rxjs/webSocket';
     *
     * const wsSubject = webSocket({
     *     url: 'ws://localhost:8081',
     *     closeObserver: {
            next(closeEvent) {
                const customError = { code: 6666, reason: "Custom evil reason" }
                console.log(`code: ${customError.code}, reason: ${customError.reason}`);
            }
        }
     * });
     *
     * //output
     * // code: 6666, reason: Custom evil reason
    

    这显示了他们如何使用closeObserver Obs 来隧道化 customError。但到目前为止的重点是,closeObserver 本身无法处理所有关闭场景/路径。

    但是如果我们向下看源文件,我们会看到:

        socket.onerror = (e: Event) => {
          this._resetState();
          observer.error(e);
        };
    
        socket.onclose = (e: CloseEvent) => {
          this._resetState();
          const { closeObserver } = this._config;
          if (closeObserver) {
            closeObserver.next(e);
          }
          if (e.wasClean) {
            observer.complete();
          } else {
            observer.error(e);
          }
        };
    

    const observer = this._output;this._output = new Subject&lt;T&gt;(); 的位置。所以我们可以看到,如果关闭事件不是一个干净的事件,那么实际的“核心”主题就会收到错误信号。

    这意味着,在内部,RxJS WS 会执行并公开捕获任何类型的服务器断开连接所需的一切,因此自定义心跳机制只是多余的。

    顺便说一句,根据我的个人经验,大多数 WS 客户端的行为都类似,只有服务器实现可能真正受益于自定义心跳系统(现在甚至可能一切都由库/框架处理,我不知道)。


    奖金

    使用 RxJS WS 的正确(并且更复杂)连接/重新连接流程的可能实现:

    this.wsSubject = webSocket(wsUrl);
    this.wsSubscription = this.wsSubject.pipe(
      retryWhen(errors =>
        errors.pipe(
          concatMap((error, i) =>
            iif(
              () => this.environment.webSockets.maxReconnectAttempts !== -1 &&
                i >= this.environment.webSockets.maxReconnectAttempts,
              throwError('WebSocket reconnecting retry limit exceeded!'),
              of(error).pipe(
                tap(() => {
                  this.disconnected = true;
                  this.log.warn('Trying to reconnect to WebSocket server...');
                }),
                delay(this.environment.webSockets.reconnectAttemptDelay)
              )
            )
          )
        )
      ),
      tap(() => {
        if (this.disconnected) {
          this.disconnected = false;
          this.log.info('Successfully re-connected to the WebSocket server.');
        }
      })
    ).subscribe(
      (data) => this.handleNotification(data),
      (err) => this.log.error(err),
      () => this.log.warn('Connection to the WebSocket server was closed!')
    );
    }
    

    【讨论】:

    • 非常感谢您的精心回答@Mihai
    猜你喜欢
    • 1970-01-01
    • 2021-01-23
    • 2010-10-24
    • 1970-01-01
    • 2010-10-26
    • 1970-01-01
    • 2017-07-30
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多