【问题标题】:Using redux-observable and subscribing to a websocket使用 redux-observable 并订阅 websocket
【发布时间】:2017-07-13 08:29:42
【问题描述】:

试图弄清楚如何让我的史诗开始,它将订阅一个 websocket,然后在发出的事件从 websocket 滚入时调度一些操作。

我看到的示例使用的是多路复用,实际上并没有在 websocket 上调用订阅,我对更改它有点困惑。

我是这样开始的。但我相信 redux observable 想要一个

const socket$ = Observable.webSocket<DataEvent>(
  "ws://thewebsocketurl"
);

const bankStreamEpic = (action$, store) =>
  action$.ofType(START_BANK_STREAM).mergeMap(action => {
    console.log("in epic mergeMap");
    socket$
      .subscribe(
        e => {
          console.log("dispatch event " + e);
         distributeEvent(e);
        },
        e => {
          logger.log("AmbassadorsDataService", "Unclean socket closure");
        },
        () => {
          logger.log("AmbassadorsDataService", "Socket connection closed");
        }
      )
  });

   function distributeEvent(event: DataEvent) : void {
        //this.logger.log('AmbassadorsDataService', 'Event Received: ' + event.command + ' and id: ' + event.id);
        if(event.source === '/ambassadors/bank') {
            if( event.command === 'REMOVE') {
                removeDataEvent(event);
            }
            else if(event.command == 'ADD') {
                loadDataEvent(event);
            }
        }
    }

它抛出一个错误: 未捕获的类型错误:您在预期流的位置提供了“未定义”。您可以提供 Observable、Promise、Array 或 Iterable。

任何帮助将不胜感激!

谢谢

【问题讨论】:

    标签: redux-observable


    【解决方案1】:

    在 redux-observable 中,你几乎永远不会(除非你知道我为什么说“几乎”)自己打电话给 subscribe。相反,Observables 是链式的,中间件和其他操作符将为您处理订阅。

    如果您只想为收到的每个事件分派一个操作,那很简单:

    const socket$ = Observable.webSocket<DataEvent>(
      "ws://thewebsocketurl"
    );
    
    const bankStreamEpic = (action$, store) =>
      action$.ofType('START_BANK_STREAM')
        .mergeMap(action =>
          socket$
            .map(payload => ({
              type: 'BANK_STREAM_MESSAGE',
              payload
            }))
        );
    

    您可能(或可能不需要)需要根据从套接字接收到的消息内容进行更多自定义,但实际上,将 其他逻辑放在减速器中可能会更好因为它可能与副作用无关。

    你可能会想要一种方法来停止流,这只是一个takeUntil

    const socket$ = Observable.webSocket<DataEvent>(
      "ws://thewebsocketurl"
    );
    
    const bankStreamEpic = (action$, store) =>
      action$.ofType('START_BANK_STREAM')
        .mergeMap(action =>
          socket$
            .map(payload => ({
              type: 'BANK_STREAM_MESSAGE',
              payload
            }))
            .takeUntil(
              action$.ofType('STOP_BANK_STREAM')
            )
        );
    

    我使用mergeMap 是因为您使用了,但在这种情况下,我认为switchMap 更合适,因为每个都有多个这些似乎是多余的,除非您需要多个并且您的问题只是省略了每个的独特之处。

    【讨论】:

    • 感谢@jayphelps 的详细回答。这现在更有意义了。
    猜你喜欢
    • 2019-08-21
    • 2019-12-03
    • 2019-07-29
    • 1970-01-01
    • 2019-12-29
    • 2018-11-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多