源码如下
/**
* Module dependencies.
*/
import Emitter from "component-emitter";
import Url from "url-parse";
import { Backoff } from "./utils";
/**
* Packet types.
*/
const packets = {
open: 0,
close: 1,
ping: 2,
pong: 3,
message: 4
};
const packetlist = Object.keys(packets);
const error = { type: "error", data: "parser error" };
/**
* Socket constructor.
*
* @param {String} uri
* @param {Object} options
*/
class Socket {
constructor(uri, opts) {
opts = opts || {};
this.uri = uri;
this.readyState = "closed";
this.ws = null;
this.WebSocketImpl = null;
this.reconnecting = false;
this.timer = null;
this.reconnection(opts.reconnection !== false);
this.reconnectionAttempts(opts.reconnectionAttempts || Infinity);
this.reconnectionDelay(opts.reconnectionDelay || 1000);
this.reconnectionDelayMax(opts.reconnectionDelayMax || 5000);
this.randomizationFacotr(opts.randomizationFacotr || 0.5);
this.backoff = new Backoff({
min: this.reconnectionDelay(),
max: this.reconnectionDelayMax(),
jitter: this.randomizationFacotr()
});
this.check();
}
/**
* Check enviroment.
*/
check() {
if ("undefined" !== typeof WebSocket) {
this.WebSocketImpl = WebSocket;
} else if (typeof self !== "undefined") {
this.WebSocketImpl = self.WebSocket || self.MozWebSocket;
}
if (!this.WebSocketImpl) {
throw new Error("Your environment not support WebSocket.");
}
this.parseUri();
this.open();
}
open(fn) {
this.cleanUp();
if ("closed" === this.readyState || "" === this.readyState) {
this.doOpen();
this.on("open", () => {
fn && fn();
})
this.on("error", err => {
this.readyState = "closed";
this.cleanUp();
if (fn) {
fn(err);
} else {
if (!this.reconnecting && this._reconnection && this.backoff.attempts === 0) {
this.reconnect();
}
}
});
}
}
send(packets) {
if ("open" === this.readyState) {
this.write(packets);
} else {
throw new Error("Transport not open");
}
}
close() {
if ("opening" === this.readyState || "open" === this.readyState) {
this.readyState = "closed";
this.backoff.reset();
this.doClose();
}
}
write(packets) {
if (!"string" === typeof packets) {
throw new Error("data must be String");
}
try {
this.ws.send(packets);
} catch(e) {
console.log("websocket closed before onclose event.");
}
}
reconnect() {
if (this.reconnecting) {
return;
}
if (this.backoff.attempts >= this._reconnectionAttempts) {
console.log(`reconnect failed. total reconnect ${this._reconnectionAttempts} times.`);
this.reconnecting = false;
this.backoff.reset();
} else {
const delay = this.backoff.duration();
console.log("wait %dms before reconnect attempt", delay);
this.reconnecting = true;
this.timer = setTimeout(() => {
console.log("attempting reconnect.");
this.reconnecting = false;
this.open((err) => {
if (err) {
this.reconnecting = false;
console.log(`reconnect ${this.backoff.attempts} times failed`);
} else {
this.reconnecting = false;
this.backoff.reset();
}
});
}, delay);
}
}
doOpen() {
const uri = this.uri;
this.ws = new this.WebSocketImpl(uri);
this.addEventListeners();
}
doClose() {
if (typeof this.ws !== "undefined") {
this.ws.close();
}
}
addEventListeners() {
const self = this;
this.ws.onopen = function() {
self.onOpen();
};
this.ws.onclose = function() {
self.onClose();
};
this.ws.onmessage = function(ev) {
console.log(ev)
self.onData(ev.data);
};
this.ws.onerror = function(e) {
self.onError("websocket error", e);
};
}
onOpen() {
this.readyState = "open";
this.writable = true;
this.emit("open");
}
onClose() {
this.readyState = "closed";
this.emit("close");
if (this._reconnection) {
this.reconnect();
}
}
onData(packet) {
const data = this.parsePacket(packet);
this.onPacket(data);
}
onPacket(packet) {
if ("opening" === this.readyState || "open" === this.readyState || "closing" === this.readyState) {
console.log(\'socket receive: type "%s", data "%s"\', packet.type, packet.data);
this.emit("packet", packet);
switch (packet.type) {
case "error":
let err = new Error("server error");
err.code = packet.data;
this.onError(err);
break;
case "message":
this.emit("message", packet.data);
break;
}
} else {
console.log(\'packet received with socket readyState "%s"\', this.readyState);
}
}
onError(msg, desc) {
let err = new Error(msg);
err.type = "TransportError";
err.description = desc;
this.emit("error", err);
}
parsePacket(data) {
if (data === undefined) {
return err;
}
let type;
if ("string" === typeof data) {
type = 4;
if (Number(type) != type || !packetlist[type]) {
return err;
}
if (data.length > 1) {
return { type: packetlist[type], data };
} else {
return { type: packetlist[type] };
}
}
type = data[0];
return { type: packetlist[type], data: data.slice(1) };
}
reconnection(v) {
this._reconnection = !!v;
}
reconnectionAttempts(v) {
this._reconnectionAttempts = v;
}
reconnectionDelay(v) {
this._reconnectionDelay = v;
this.backoff && this.backoff.setMin(v);
}
randomizationFacotr(v) {
this._randomizationFactor = v;
this.backoff && this.backoff.setJitter(v);
}
reconnectionDelayMax(v) {
this._reconnectionDelayMax = v;
this.backoff && this.backoff.setMax(v);
}
cleanUp() {
this.removeAllListeners("open");
this.removeAllListeners("error");
clearTimeout(this.timer);
}
parseUri() {
const url = Url(this.url);
let isSecure = false;
if (url.protocol === "https:" || url.protocol === "wss:") {
isSecure = true;
}
const protocol = isSecure ? "wss://" : "ws://";
this.url = protocol + url.hostname + url.pathname + url.query;
}
}
Emitter(Socket.prototype);
export default Socket;