【问题标题】:How to make Observers wait for creation of WebSocket after authentication如何让观察者在认证后等待 WebSocket 的创建
【发布时间】:2017-09-13 15:39:43
【问题描述】:

我正在使用 RxJs 来监听来自 websocket 的数据流。为了使用这个 pubsub 套接字,我需要首先在另一个套接字上进行身份验证,该套接字将返回 pubsub 套接字 url。

我现在只有一个订阅者才能完成这项工作。我需要添加另一个,但我当前的方法存在问题,因为两个订阅者都会触发身份验证并创建第二个 pubsub 套接字。

需要明确的是,整个应用程序应该只有一个身份验证套接字和一个 pubsub 套接字。但是我需要订阅者在尝试使用 pubsub 套接字之前“等待”进行身份验证。否则,pubsub 套接字将是未定义的(因为我们只知道它是运行时的 url)。

这是我目前的尝试:

Websocket.service.ts

    private connect(): Observable<IConnectionInfo> {

    if (!this.authObservable) {
      var credentials = {
        "username": "bob",
        "password": "slob"
      }

      this.authObservable = Observable.create((observer) => {
        const socket = new WebSocket(this.authurl);
        socket.onopen = () => {
          console.log("Auth socket opened");
          socket.send(JSON.stringify(credentials));
        }
        socket.onmessage = (event) => {
          var connection_info = <IConnectionInfo>JSON.parse(event.data);
          observer.next(connection_info);
        }

        return () => {
          socket.close(); //invoked on unsubscribe
          console.log("Auth socket closed");
        }
      })
    }
    return this.authObservable;
  }

public open_pubsub(events: string[]): Observable<IPubSubMessage> {

    return this.connect()
      .flatMap((connection_info: IConnectionInfo) => {
        var url = "ws://" + this.hostName + ":" + connection_info.port + "/" + connection_info.ps;

        var subscription = {
          "subscribe": events
        }

        var authenticate_request = {
          "authenticate": connection_info['token']
        }

        if (!this.psObservable) {
          this.psObservable = Observable.create((observer) => {
            const socket = new WebSocket(url);

            socket.onopen = () => {
              console.log("PS socket opened");
              socket.send(JSON.stringify(authenticate_request));
              socket.send(JSON.stringify(subscription));
            }
            socket.onmessage = (event) => {
              var psmsg = <IPubSubMessage>JSON.parse(event.data);
              observer.next(psmsg);
            }

            return () => {
              socket.close(); //invoked on unsubscribe
              console.log("PS socked closed");
            }
          })
        }
        return this.psObservable;
      }
      );
  }

还有观察者:

getSystemState(): Observable<string> {

    return this._wsService.open_pubsub([MSG_SYSTEM_STATE_CHANGED])
        .map((response: IPubSubMessage): string => {

                console.log(response.payload);
                return "I wish this worked";
        })
        .catch(this.handleError);
}

任何帮助表示赞赏!

编辑基于一个看似已被删除但确实非常有用的答案,我修改了代码。这解决了部分问题,但仍然导致多个套接字)

【问题讨论】:

    标签: angular typescript authentication websocket rxjs


    【解决方案1】:

    找到了我所缺少的东西——share() 运算符的魔力。

    所以对于两个 Observable,我都这样做:

    this.authObservable = Observable.create((observer) => {
           ...(etc)...
          }).share();
    

    令人惊讶的是,每个套接字只创建了 1 个。

    【讨论】:

      猜你喜欢
      • 2021-12-30
      • 2023-04-10
      • 1970-01-01
      • 2019-08-20
      • 2020-03-08
      • 1970-01-01
      • 2016-01-17
      • 2016-02-20
      • 2011-06-18
      相关资源
      最近更新 更多