【问题标题】:RxJs: how to maintain one WebSocket connection across multiple components?RxJs:如何跨多个组件维护一个 WebSocket 连接?
【发布时间】:2021-10-15 23:20:00
【问题描述】:

我有一个由多个组件组成的聊天应用程序(例如,一个组件包含组列表,另一个包含消息等),当一个组件打开时,另一个组件必须关闭。

所以这是我用来提供 webSocket 连接的服务:

@Injectable()
export class ChatConnectionService {

  private _connection: Subject<any>;

  constructor(private window: WindowAbstract) { }

  public closeConnection(): void {
    this.connection.complete();
  }

  public send(msg: ChatRequest): void {
    this.connection.next(msg);
  }

  public messages(): Subject<any> {
    return this.connection;
  }

  private get connection(): Subject<any> {
    if (!this._connection || this._connection.closed) {
      this._connection = webSocket(`https/myapp/chat`);
    }

    return this._connection;
  }
}

这是some.component.ts中的一个用法示例:

  private messages$: Subscription;

  ngOnInit(): void {
    this.messages$ = this.chatConnectionService.messages()
      .subscribe(m => this.handleInboundMessage(m));
  }

  ngOnDestroy() {
    this.messages$.unsubscribe();
  }

现在的问题是,当我在组件之间切换时,一旦旧组件被销毁时取消订阅,新订阅似乎不起作用,它不会发送或接收消息,也不会产生任何错误.

但是当我删除取消订阅行时,它开始工作,但也继续使用旧订阅接收消息,这不是我真正想要的。

总结一下——我需要维护一个连接,但有可能让多个组件根据需要订阅和取消订阅,是否可以使用 RxJs 来完成它?或者我做错了什么?任何建议将不胜感激,谢谢!

【问题讨论】:

  • 阅读速度很快,可能无济于事,但可能使服务成为单身,或者共享重播可能会有所帮助
  • 您确定您的组件实际上共享同一个服务实例吗?当您在 Angular 中为依赖注入声明服务时,您必须指定服务的“范围”。我在 Injectable 装饰器中看不到providedIn: 'root',这是声明全局服务的常用方法。因此,您的组件可能都有自己的私有服务实例。这并不能解释整个问题,但无论如何,确保在所有组件之间共享一个服务实例很重要。
  • 我确定,它是在包含组件的模块中声明的
  • 你能创建一个最小的 repo 吗?
  • 您在ChatConnectionService 的哪个位置实例化了connection

标签: angular websocket rxjs rxjs6


【解决方案1】:

问题是您正在为组件提供对从webSocket('https/myapp/chat') 返回的WebSocketSubject 的直接引用。如果你在WebSocketSubject 上调用.unsubscribe() 方法,它会关闭通道,这就是为什么没有其他组件检测到消息的原因。

与其做 getter,不如声明一个从私有 WebSocketSubject 管道传输的公共 observable。

@Injectable()
export class ChatConnectionService {

  public messages$ = Observable<any>; // Replace 'any' with WS response type.
  private connection: WebSocketSubject;

  constructor(private window: WindowAbstract) { 
    this.connection = webSocket(`https/myapp/chat`);
    this.messages$ = this.connection.asObservable();
  }

}

您的组件现在将从 observable 退订,而不是调用 WebSocketSubject 退订频道。


更新 在进一步查看由webSocket() 实例化的WebSocketSubject 之后,我意识到.asObservable() 并不是适合您的情况的最佳用例。即使您的组件取消订阅 observable,Subject 仍会从开放的 WS 通道接收新数据。

this.messages$ = this.connection.multiplex(
  () => { 'send message to WS server to start' }, 
  () => { 'send message to WS server to stop' }, 
  (message) => true // filter messages by condition
);

您可以使用multiplex() 方法从您的WebSocketSubject 声明一个可观察对象。它需要三个参数:

  1. 声明 WS 启动消息的函数。
  2. 声明 WS 结束消息的函数。
  3. 过滤消息的功能

现在,每当任何组件订阅messages$,它都会触发启动消息,为订阅者打开通道。当所有 observable 取消订阅 messages$ 时,它会发送关闭信号以停止任何进一步的消息。

这为您的 WS 维护单例状态节省了大量样板。

参考:https://rxjs.dev/api/webSocket/WebSocketSubject

【讨论】:

  • 嗨@digclo!只是想给你一个反馈。关于多路复用建议,我认为您误解了它的用途 - 当您通过单个 websocket 连接运行不同的通道时。但我的情况是当我只有一个频道和多个订阅者时。我也试过你最初的建议,它也没有任何区别。但到目前为止我发现,如果你一直保持至少一个订阅处于活动状态,当另一个组件取消订阅时它不会关闭连接,这可能与 websockets 的 rxjs 实现有关
  • 更简单的方法是定义this.messages$ = this.connection.asObservable()。我尝试使用 multiplex() 的目的是防止您在应用处于不需要该数据的某种状态时在后台运行活动的 websocket。
【解决方案2】:

我会把它留给那些最终处于同样情况的人。

您需要至少有一个订阅一直在运行,以便在您取消订阅时它不会关闭连接。

这就是我所做的:

@NgModule({
  imports: [
     ...
  ],
  providers: [
    {
        provide: APP_INITIALIZER,
        multi: true,
        // subscribing establishes a connections
        useFactory: (chat: ChatConnectionService) => () => chat.messages$.subscribe(),
        deps: [ChatConnectionService]
    }
  ]
})

export class AppModule {
}

这里不那么明显的陷阱是,从 WebSocketSubjectObservable 取消订阅(通过订阅主题创建)具有相同的效果 - 如果它是最后一次订阅,它会关闭活动连接。

【讨论】:

    猜你喜欢
    • 2021-03-15
    • 2017-01-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-03-04
    • 2014-01-31
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多