【问题标题】:Converting websocket events to observables将 websocket 事件转换为 observables
【发布时间】:2019-03-15 06:12:40
【问题描述】:

我正在使用ws library 创建一个 websocket 客户端来与(基于 JSON 的)websocket 服务器进行通信。每个请求都必须有一个唯一的 requestId,并且响应将具有相同的 requestId

我的代码的简化示例

let requestId = 0;

const WebSocket = require('ws');
const ws = new WebSocket('ws://www.host.com/path');

ws.on('message', (json) => {
  let obj = JSON.parse(json);

  if (obj.eventType === 'SomeEvent') {
     ws.send({
       requestId,
       command: "DoSomeStuff"
     });
  }
});

我想在我的文件中实现一个发送方法,该方法在内部调用ws.send(),但返回一个可观察对象,当 websocket 接收到 requestId 属性与 requestId 匹配的ws.on('message') 时,该方法会解析。

这样我就可以订阅 Observable 并将我的代码更改为

...
  if (obj.eventType === 'SomeEvent') {
     mySendFunction({
       requestId,
       command: "DoSomeStuff"
     }).subscribe(result => {
        // Do something with result here
     }
  }
...

另外 - 有时(很少)服务器不响应请求。在那种情况下,如果在给定的时间段内没有收到响应,有没有办法让 observable 超时?


我见过使用数组将 requestId 映射到 Promise 然后手动解析它们的实现,但我认为必须有一种更聪明的方法可以直接使用 RxJS 执行此操作。

有什么想法吗?

【问题讨论】:

  • 所以你要先拨打send,然后通过on等待回复?
  • 似乎 rxjs 5 和 6 有 websocket subject

标签: javascript node.js websocket rxjs observable


【解决方案1】:

您可以做的一个非常简单的事情就是使用new Observable()

const mySendFunction = options => new Observable(observer => {
  const handler = ws.on('message', json => {
    let obj = JSON.parse(json);

    if (obj.eventType === options.command) {
      observer.next(obj);
      observer.complete();
    }
  });

  ws.send(options);

  return () => {
    // ...cancel handler?
  };
});

【讨论】:

    猜你喜欢
    • 2021-11-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-02-04
    • 1970-01-01
    • 2016-03-30
    • 2015-12-27
    • 1970-01-01
    相关资源
    最近更新 更多