【问题标题】:Call redux action within redux-saga inside websocket callback (stomp + sockjs)在 websocket 回调中调用 redux-saga 中的 redux 操作(stomp + sockjs)
【发布时间】:2020-06-18 22:53:36
【问题描述】:

我在我的项目中使用reduxredux-saga。现在使用 WebSocket 我在套接字响应的回调中调用 FETCH_SUCCESS redux 操作时遇到问题。我也尝试将回调设为生成器,但效果不佳。

function* websocketSaga() {
  const socket = new SockJS(`${CONFIG.API_URL}/ws`);
  const stomp = Stomp.over(socket);
  const token = yield select(selectToken);
  stomp.connect(
    {
      Authorization: `Bearer ${token}`,
    },
    frame => {
      stomp.subscribe('/queue/data', message => {
        const response = JSON.parse(message.body);
        console.log(response); // here is the proper response, it works
        put({
          type: FETCH_SUCCESS, // here the FETCH_SUCCESS action is not called
          payload: response.dataResponse,
        });
      });
      ...
      ....
    }
  );
}

或者这个WebSocket应该在redux-saga中以完全不同的方式实现?

【问题讨论】:

  • 亲爱的@heisenberg7584,我认为您的redux-sagaredux 连接实现方式不正确,因为使用my implementation,您可以做任何事情。当然,对于这个小问题,我可以以适当的方式帮助你。如果您同意我先帮助您,请留下一个简单的重新制作您的项目的问题,例如 Github 或 Gitlab 或 Codesandbox 上的一个小项目。 --- 重新制作有问题的项目的缩减版,以展示给其他人

标签: reactjs redux websocket redux-saga stomp


【解决方案1】:

您将无法在回调函数中使用yield put。 Stompjs 对 sagas 一无所知,因此它不知道在给定生成器函数时应该做什么。

虽然不一定是最好的,但最简单的方法是在回调中直接进入 redux 存储,并在不涉及 redux-saga 的情况下调度 action。例如:

import store from 'wherever you setup your store'

// ...
stomp.subscribe('/queue/data', message => {
  const response = JSON.parse(message.body);
  store.dispatch({
    type: FETCH_SUCCESS,
    payload: response.dataResponse,
  });
});

如果您想使用更多的 redux-saga-y 方法,我建议将订阅包装在 event channel 中。事件通道采用基于回调的 API 并将其转换为您可以使用 redux-saga 的效果与之交互的东西,例如 take

以下是创建事件通道的方法:

import { eventChannel } from 'redux-saga';

function createChannel(token) {
  return eventChannel(emitter => {
    const socket = new SockJS(`${CONFIG.API_URL}/ws`);
    const stomp = Stomp.over(socket);
    stomp.connect(
      {
        Authorization: `Bearer ${token}`,
      },
      frame => {
        stomp.subscribe('/queue/data', message => {
          const response = JSON.parse(message.body);
          emitter(response); // This is the value which will be made available to your saga
        });
      }
    );

    // Returning a cleanup function, to be called if the saga completes or is cancelled
    return () => stomp.disconnect();
  });
}

然后你会像这样使用它:

function* websocketSaga() {
  const token = yield select(selectToken);
  const channel = createChannel(token);
  while (true) {
    const response = yield take(channel);
    yield put({
      type: FETCH_SUCCESS,
      payload: response.dataResponse,
    });
  }
}

【讨论】:

  • 我已经找到了这个渠道解决方案,效果很好。在这种情况下,我们如何在我们的商店中有一些 isConnected 标志?哪个是在真假之间更改它的正确位置?另外 - 有时来自频道的响应是错误的 - 如果是正确的响应还是错误,如何区分它们?
【解决方案2】:

Promise 应该是最合适的。只需将回调相关代码包装在一个承诺中,并将resolve 包装在回调函数中。之后使用yield 从promise 中获取数据。我已经用下面的Promise 修改了你的代码。

function* websocketSaga() {
  const socket = new SockJS(`${CONFIG.API_URL}/ws`);
  const stomp = Stomp.over(socket);
  const token = yield select(selectToken);

  const p = new Promise((resolve, reject) => {
    stomp.connect(
      {
        Authorization: `Bearer ${token}`,
      },
      frame => {
        stomp.subscribe('/queue/data', message => {
          const response = JSON.parse(message.body);
          console.log(response); // here is the proper response, it works
          resolve(response); // here resolve the promise, or reject if any error
        });
        ...
        ....
      }
    );
  });

  try {
    const response = yield p;  // here you will get the resolved data
    yield put({
      type: FETCH_SUCCESS, // here the FETCH_SUCCESS action is not called
      payload: response.dataResponse,
    });
  } catch (ex) {
    // handle error here, with rejected value
  }

}

【讨论】:

    【解决方案3】:

    我将为您提供另一种管理方式:创建一个连接到 redux 的组件,您将在其中处理 WS 订阅。该组件不会向 UI 呈现任何内容,但对于处理 redux 商店交互很有用。

    主要思想是,不要把所有东西都放到redux-saga中,试着把它拆分成多个部分,这样更容易维护。

    const socket = new SockJS(`${CONFIG.API_URL}/ws`);
    
    function WSConnection(props) {
      const {token, fetchDone} = props;
      const [stomp, setStomp] = React.useState();
    
      const onMessage = React.useCallback(message => {
        const response = JSON.parse(message.body);
        fetchDone(response.dataResponse);
      }, [fetchDone]);
    
      const onConnect = React.useCallback(frame => {
        const subscription = stomp.subscribe('/queue/data', onMessage);
        // cleanup subscription
        return () => subscription.unsubscribe();
      }, [stomp, onMessage]);
    
      const onError = React.useCallback(error => {
        // some error happened, handle it here
      }, []);
    
      React.useEffect(() => {
        const header = {Authorization: `Bearer ${token}`};
        stomp.connect(header, onConnect, onError);
        // cleanup function
        return () => stomp.disconnect();
      }, [stomp])
    
      React.useEffect(() => {
        setStomp(Stomp.over(socket));
      }, []);
    
      return null;
    }
    
    const mapStateToProps = state => ({
    ... // whatever you need from redux store
    });
    
    const mapDispatchToProps = dispatch => ({
    ... // whatever actions you need to dispatch
    });
    
    export default connect(mapStateToProps, mapDispatchToProps)(WSConnection);
    

    您还可以更进一步,将 stomp 逻辑提取到另一个文件中,然后在需要的地方重复使用。

    将所有内容都放入 redux-saga 并没有错,但它是处理连接到 redux 的组件内部的 WS 连接的一个不错的选择(对于不完全熟悉 redux-saga 和通道等的人来说更容易理解)。

    【讨论】:

      【解决方案4】:

      多年来我拥有相同的堆栈,直到最近我面对websockets 超过Stomp 客户。 以上解决方案在技术上和精神上都不适合我

      原因:

      • 我不喜欢带有Stomp 的通道,因为以更外科手术的方式操作连接的唯一方法是您必须使用全局状态对象(对我来说 - 它是redux)。即使您只存储随机生成的 IDS,这似乎也不正确(使用 unsubscribe 函数它将是......阅读更多 here 关于存储序列化
      • 使用容器的方式又是一个痛苦……(你知道在哪里)。再次redux 和许多底层功能无缘无故地使用
      • 再次使用promises: 的另一种方式,无需通过在生成器中使用promise 来存储有用的连接信息和一些DI。这缩小了实现选择的范围

      所以:

      • 我需要有连接信息(我决定使用状态但不在:redux,组件状态。单例状态)。 Stomp 不会强迫你放置 ID,但我这样做是因为我想自己管理连接
      • 我需要一个入口点,没有:promisesiterators 以及许多对未来的我来说会很痛苦的事情。一个“统治所有人”的地方(如我所愿) - 激活:登录 - 停用:注销 - 订阅:componentDidMount - 退订:componentWillUnmount
      • DI 在一个地方请求(仅在需要时将 store.dispatch 传递给构造函数)// main topic of the question

      我编写了这个非常适合我的实现

      import SockJS from 'sockjs-client';
      import {
          Client,
          IMessage,
          messageCallbackType,
          StompHeaders,
      } from '@stomp/stompjs';
      
      import { Action, Dispatch } from 'redux';
      
      type ConnectionId = string;
      
      interface IServiceConfig {
          url: string;
          dispatch?: Dispatch;
      }
      
      export default class Stomp {
          serviceConfig: IServiceConfig = {
              dispatch: null,
              url: null,
          };
      
          ids: ConnectionId[] = [];
      
          stomp: Client;
      
          constructor(config: IServiceConfig) {
              this.serviceConfig = { ...config };
      
              this.stomp = new Client();
      
              this.stomp.webSocketFactory = () => {
                  return (new SockJS(config.url));
              };
          }
      
          alreadyInQueue = (id: ConnectionId): boolean => {
              return Boolean(this.ids.find(_id => id === _id));
          };
      
          subscribeByDispatchAction = (
              destination: string,
              callback: (message: IMessage) => Action,
              headers: StompHeaders & {
                  id: ConnectionId;
              },
          ): void => {
              const alreadyInQueue = this.alreadyInQueue(headers.id);
      
              if (!alreadyInQueue) {
                  this.stomp.subscribe(
                      destination,
                      (message) => {
                          this.serviceConfig.dispatch(callback(message));
                      },
                      headers,
                  );
      
                  this.ids.push(headers.id);
      
                  return;
              }
      
              console.warn(`Already in queue #${headers.id}`);
          };
      
          subscribe = (
              destination: string,
              callback: messageCallbackType,
              headers: StompHeaders & {
                  id: ConnectionId;
              },
          ): void => {
              const alreadyInQueue = this.alreadyInQueue(headers.id);
      
              if (!alreadyInQueue) {
                  this.stomp.subscribe(
                      destination,
                      (message) => callback(message),
                      headers,
                  );
      
                  this.ids.push(headers.id);
      
                  this.logState('subscribe');
      
                  return;
              }
      
              console.warn(`Failed to subscribe over Socks by #${headers.id}`);
          };
      
          unsubscribe = (id: ConnectionId, headers?: StompHeaders): void => {
              this.stomp.unsubscribe(id, headers);
      
              this.ids.splice(this.ids.indexOf(id), 1);
          };
      
          activate = (): void => {
              this.stomp.activate();
          };
      
          deactivate = (): void => {
              if (this.ids.length === 0) {
                  this.stomp.deactivate();
      
                  return;
              }
      
              for (let i = 0; i < this.ids.length; i++) {
                  this.unsubscribe(this.ids[i]);
              }
      
              /**
               * it seems like it's overkil but
               * for me it works only if i do all
               * the things as you see below
               * - stomp deactivation
               * - closing webSockets manually by using native constant // sockjs-client
               * - closing webSockets instance by using returned value fron factory
               */
              this.stomp.deactivate();
      
              this.stomp.webSocket.close(
                  this.stomp.webSocket.CLOSED,
              );
      
              this.stomp.webSocketFactory().close();
          };
      
          getAllIds = (): readonly ConnectionId[] => {
              return this.ids;
          };
      
          // debug method
          logState = (method: string): void => {
              /* eslint-disable */
              console.group(`Stomp.${method}`);
              console.log('this', this);
              console.log('this.ids', this.getAllIds());
              console.log('this.stomp', this.stomp);
              console.groupEnd();
              /* eslint-enable */
          };
      }
      

      我的配置文件

      import { store } from '~/index';
      
      import Stomp from '~/modules/_Core/services/Stomp';
      import appConfig from '~/modules/Common/services/appConfig';
      
      export const StompService = new Stomp({
          dispatch: store?.dispatch,
          url: `${appConfig.apiV1}/websocket`,
      });
      

      希望对大家有所帮助

      【讨论】:

        猜你喜欢
        • 2022-01-15
        • 2021-11-14
        • 2017-06-14
        • 1970-01-01
        • 1970-01-01
        • 2017-05-14
        • 2017-04-25
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多