【问题标题】:How to use RxJs with Socket.IO on event如何在事件中使用带有 Socket.IO 的 RxJs
【发布时间】:2023-03-31 12:47:01
【问题描述】:

我想在我的socket.on('sense',function(data){}); 中使用 RxJS。我对可用的文档很少以及对 RxJS 缺乏理解感到困惑和困惑。这是我的问题。

我有一个distSensor.js,它有一个函数 pingEnd()

function pingEnd(x){
socket.emit("sense", dist); //pingEnd is fired when an Interrupt is generated.
}

在我的 App.js 中

io.on('connection', function (socket) {
    socket.on('sense', function (data) {
        //console.log('sense from App4 was called ' + data);
    });
});

sense 函数获取了很多我想使用 RxJS 过滤的传感器数据,但我不知道接下来应该做什么才能在这里使用 RxJs。任何指向正确文档或示例的指针都会有所帮助。

【问题讨论】:

    标签: javascript sockets rxjs


    【解决方案1】:

    你可以像这样创建一个 Observable:

    var senses = Rx.Observable.fromEventPattern(
        function add (h) {
          socket.on('sense',h);
        }
      );
    

    然后像任何其他 Observable 一样使用senses

    【讨论】:

    • add(h)方法的作用是什么?该功能是 RxJs 的一部分吗?
    • 我只是为函数命名,您只需:function(h){...}
    • fromEvent 对我不起作用。而我最终得到了这个解决方案。在 TS 中:let obs = Observable.fromEventPattern(h => this.socket.on('my_event', h));
    • 只用fromEvent就行了,不用多写代码! const sense$ = Rx.Observable.fromEvent(socket, 'sense');
    【解决方案2】:

    您可以使用Rx.Observable.fromEvent (https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/fromevent.md)。

    这是我使用 Bacon.js 做类似事情的方法,它有一个非常相似的 API:https://github.com/raimohanska/bacon-minsk-2015/blob/gh-pages/server.js#L13

    所以在 Bacon.js 中会这样

    io.on('connection', function(socket){
      Bacon.fromEvent(socket, "sense")
        .filter(function(data) { return true })
        .forEach(function(data) { dealWith(data) })
    })
    

    在 RxJs 中,您可以将 Bacon.fromEvent 替换为 Rx.Observable.fromEvent

    【讨论】:

    • 请删除“我认为”,因为它工作正常。正如您已经描述的那样,用Rx.Observable 替换培根。
    【解决方案3】:

    我在使用 fromEvent 方法时遇到了一些奇怪的问题,所以我更喜欢创建自己的 Observable:

    function RxfromIO (io, eventName) {
      return Rx.Observable.create(observer => {
        io.on(eventName, (data) => {
            observer.onNext(data)
        });
        return {
            dispose : io.close
        }
    });
    

    然后我可以这样使用:

    let $connection = RxfromIO(io, 'connection');
    

    【讨论】:

    • 只用fromEvent就可以了,不用多写代码! const connection$ = Rx.Observable.fromEvent(io, 'connection');
    • 嗨,伙计!您遇到过什么样的问题?
    【解决方案4】:

    你可以使用 rxjs-dom,

    Rx.DOM.fromWebSocket(url, protocol, [openObserver], [closeObserver])
    
    
    // an observer for when the socket is open
    var openObserver = Rx.Observer.create(function(e) {
      console.info('socket open');
    
      // Now it is safe to send a message
      socket.onNext('test');
    });
    
    // an observer for when the socket is about to close
    var closingObserver = Rx.Observer.create(function() {
      console.log('socket is about to close');
    });
    
    // create a web socket subject
    socket = Rx.DOM.fromWebSocket(
      'ws://echo.websocket.org',
      null, // no protocol
      openObserver,
      closingObserver);
    
    // subscribing creates the underlying socket and will emit a stream of incoming
    // message events
    socket.subscribe(
      function(e) {
        console.log('message: %s', e.data);
      },
      function(e) {
        // errors and "unclean" closes land here
        console.error('error: %s', e);
      },
      function() {
        // the socket has been closed
        console.info('socket closed');
      }
    );
    

    【讨论】:

    • 看来 Rx.DOM.fromWebSocket 不能与 socket.io-server 一起使用。 Rx.DOM.fromWebSocket 使用 w3c 标准 websocket API。
    【解决方案5】:

    我使用的 ES6 一个衬垫,使用 ES7 bind 语法:
    (将$读作stream

    import { Observable } from 'rxjs'
    
    // create socket
    
    const message$ = Observable.create($ => socket.on('message', ::$.next))
    // translates to: Observable.create($ => socket.on('message', $.next.bind(this)))
    
    // filter example
    const subscription = message$
      .filter(message => message.text !== 'spam')
      //or .filter(({ text }) => text !== 'spam')
      .subscribe(::console.log)
    

    【讨论】:

      【解决方案6】:

      只需使用fromEvent()。这是 Node.js 中的完整示例,但在浏览器中的工作方式相同。请注意,我使用first()takeUntil() 来防止内存泄漏:first() 只监听一个事件然后完成。现在在您侦听的所有其他套接字事件上使用takeUntil(),以便在断开连接时完成观察:

      const app = require('express')();
      const server = require('http').createServer(app);
      const io = require('socket.io')(server);
      const Rx = require('rxjs/Rx');
      
      connection$ = Rx.Observable.fromEvent(io, 'connection');
      
      connection$.subscribe(socket => {
          console.log(`Client connected`);
      
          // Observables
          const disconnect$ = Rx.Observable.fromEvent(socket, 'disconnect').first();
          const message$ = Rx.Observable.fromEvent(socket, 'message').takeUntil(disconnect$);
      
          // Subscriptions
          message$.subscribe(data => {
              console.log(`Got message from client with data: ${data}`);
              io.emit('message', data); // Emit to all clients
          });
      
          disconnect$.subscribe(() => {
              console.log(`Client disconnected`);
          })
      });
      
      server.listen(3000);
      

      【讨论】:

      • 你好@Mick,很好的示例,但是客户端呢?当通过 fromEvent observable 调用取消订阅时,我正在为如何断开连接而苦苦挣扎。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-08-24
      • 1970-01-01
      相关资源
      最近更新 更多