在此线程中找到帮助后,我将写下我是如何处理此问题的。它可能对其他人有用。这在很大程度上取决于您选择的服务器端解决方案。
我的做法:
首先,我构建了一个 SubscriptionHandler,它将通过 SubscriptionHandler#setupSubscription 处理 requestStream#subscribeFunction。
SubscriptionHandler 实例化一个 WebSocket(使用 ReconnectingWebSockets 的自定义版本)并将 onmessage 事件附加到一个内部方法(SubscriptionHandler#receiveSubscriptionPayload),该方法会将有效负载添加到相应的请求中。
我们通过 SubscriptionHandler#newSubscription 创建新订阅,它将使用内部属性 SubscriptionHandler.subscriptions 添加此订阅的键控条目(我们在查询和变量上使用 MD5 哈希实用程序);意味着该对象将出现:
SubscriptionHandler.subscriptions = {
[md5hash]: {
query: QueryObject,
variables: SubscriptionVariables,
observer: Observer (contains OnNext method)
}
每当服务器发送订阅响应时,都会调用 SubscriptionHandler#receiveSubscriptionPayload 方法,它会使用 query/variables md5 哈希来识别有效负载属于哪个订阅,然后使用 SubscriptionHandler.subscriptions 观察者 onNext 方法。
这种方式需要服务器返回如下消息:
export type ServerResponseMessageParsed = {
payload: QueryPayload,
request: {
query: string,
variables: Object,
}
}
我不知道这是否是处理订阅的好方法,但它现在适用于我当前的设置。
SubscriptionHandler.js
class SubscriptionHandler {
subscriptions: Object;
subscriptionEnvironment: RelayModernEnvironment;
websocket: Object;
/**
* The SubscriptionHandler constructor. Will setup a new websocket and bind
* it to internal method to handle receving messages from the ws server.
*
* @param {string} websocketUrl - The WebSocket URL to listen to.
* @param {Object} webSocketSettings - The options object.
* See ReconnectingWebSocket.
*/
constructor(websocketUrl: string, webSocketSettings: WebSocketSettings) {
// All subscription hashes and objects will be stored in the
// this.subscriptions attribute on the subscription handler.
this.subscriptions = {};
// Store the given environment internally to be reused when registering new
// subscriptions. This is required as per the requestRelaySubscription spec
// (method requestSubscription).
this.subscriptionEnvironment = null;
// Create a new WebSocket instance to be able to receive messages on the
// given URL. Always opt for default protocol for the RWS, second arg.
this.websocket = new ReconnectingWebSocket(
websocketUrl,
null, // Protocol.
webSocketSettings,
);
// Bind an internal method to handle incoming messages from the websocket.
this.websocket.onmessage = this.receiveSubscriptionPayload;
}
/**
* Method to attach the Relay Environment to the subscription handler.
* This is required as the Network needs to be instantiated with the
* SubscriptionHandler's methods, and the Environment needs the Network Layer.
*
* @param {Object} environment - The apps environment.
*/
attachEnvironment = (environment: RelayModernEnvironment) => {
this.subscriptionEnvironment = environment;
}
/**
* Generates a hash from a given query and variable pair. The method
* used is a recreatable MD5 hash, which is used as a 'key' for the given
* subscription. Using the MD5 hash we can identify what subscription is valid
* based on the query/variable given from the server.
*
* @param {string} query - A string representation of the subscription.
* @param {Object} variables - An object containing all variables used
* in the query.
* @return {string} The MD5 hash of the query and variables.
*/
getHash = (query: string, variables: HashVariables) => {
const queryString = query.replace(/\s+/gm, '');
const variablesString = JSON.stringify(variables);
const hash = md5(queryString + variablesString).toString();
return hash;
}
/**
* Method to be bound to the class websocket instance. The method will be
* called each time the WebSocket receives a message on the subscribed URL
* (see this.websocket options).
*
* @param {string} message - The message received from the websocket.
*/
receiveSubscriptionPayload = (message: ServerResponseMessage) => {
const response: ServerResponseMessageParsed = JSON.parse(message.data);
const { query, variables } = response.request;
const hash = this.getHash(query, variables);
// Fetch the subscription instance from the subscription handlers stored
// subscriptions.
const subscription = this.subscriptions[hash];
if (subscription) {
// Execute the onNext method with the received payload after validating
// that the received hash is currently stored. If a diff occurs, meaning
// no hash is stored for the received response, ignore the execution.
subscription.observer.onNext(response.payload);
} else {
console.warn(Received payload for unregistered hash: ${hash});
}
}
/**
* Method to generate new subscriptions that will be bound to the
* SubscriptionHandler's environment and will be stored internally in the
* instantiated handler object.
*
* @param {string} subscriptionQuery - The query to subscribe to. Needs to
* be a validated subscription type.
* @param {Object} variables - The variables for the passed query.
* @param {Object} configs - A subscription configuration. If
* override is required.
*/
newSubscription = (
subscriptionQuery: GraphQLTaggedNode,
variables: Variables,
configs: GraphQLSubscriptionConfig,
) => {
const config = configs || DEFAULT_CONFIG;
requestSubscription(
this.subscriptionEnvironment,
{
subscription: subscriptionQuery,
variables: {},
...config,
},
);
}
setupSubscription = (
config: ConcreteBatch,
variables: Variables,
cacheConfig: ?CacheConfig,
observer: Observer,
) => {
const query = config.text;
// Get the hash from the given subscriptionQuery and variables. Used to
// identify this specific subscription.
const hash = this.getHash(query, variables);
// Store the newly created subscription request internally to be re-used
// upon message receival or local data updates.
this.subscriptions[hash] = { query, variables };
const subscription = this.subscriptions[hash];
subscription.observer = observer;
// Temp fix to avoid WS Connection state.
setTimeout(() => {
this.websocket.send(JSON.stringify({ query, variables }));
}, 100);
}
}
const subscriptionHandler = new SubscriptionHandler(WS_URL, WS_OPTIONS);
export default subscriptionHandler;