1.系统架构
(来源:https://github.com/lynckia/licode/issues/335)
2.nuve模块
(修改:https://blog.csdn.net/u012908515/article/details/53940787)
app.post('/rooms', roomsResource.createRoom);
app.get('/rooms', roomsResource.represent);
app.get('/rooms/:room', roomResource.represent);
app.put('/rooms/:room', roomResource.updateRoom);
app.patch('/rooms/:room', roomResource.patchRoom);
app.delete('/rooms/:room', roomResource.deleteRoom);
app.post('/rooms/:room/tokens', tokensResource.create);
app.post('/services', servicesResource.create);
app.get('/services', servicesResource.represent);
app.get('/services/:service', serviceResource.represent);
app.delete('/services/:service', serviceResource.deleteService);
app.get('/rooms/:room/users', usersResource.getList);
app.get('/rooms/:room/users/:user', userResource.getUser);
app.delete('/rooms/:room/users/:user', userResource.deleteUser);
2.1签名验证
app.get('*', nuveAuthenticator.authenticate);
app.post('*', nuveAuthenticator.authenticate);
app.put('*', nuveAuthenticator.authenticate);
app.patch('*', nuveAuthenticator.authenticate);
app.delete('*', nuveAuthenticator.authenticate);
每一次客户端请求都会进行签名验证。
2.2cloudHandle.js
cloudHandler提供了nuve对EC的调用:获取房间中的所有用户,删除房间,删除指定用户;以及EC对nuve的调用:删除token,EC的添加、状态更新,删除以及保活。
调用方法通过RPCMQ实现,他们共同维护了两个队列,队列A用来传递调用消息,Nuve发布调用消息后,并且会维护一个调用方法和回调函数的字典;EC从队列A中获取调用消息后执行调用方法,并将结果消息push进队列B;Nuve从队列B中获取调用结果消息,并从字典中得到回调函数并执行callback,删除对应字典值。
2.3服务、房间、用户管理
1.只有superService才有权限进行service的相关操作,即配置文件中的superService = config.nuve.superserviceID
2.在room模块中currentService即当前请求的service,会维护一个rooms列表,且创建房间的时候可携带房间的媒体信息:room.mediaConfiguration
3.Nuve中的user管理仅仅提供查询和删除功能,通过房间id,对EC发出获取当前房间用户信息,或者删除该用户
room {name: '', [p2p: bool], [data: {}], _id: ObjectId}
service {name: '', key: '', rooms: Array[room], testRoom: room, testToken: token, _id: ObjectId}
token {host: '', userName: '', room: '', role: '', service: '', creationDate: Date(), [use: int], [p2p: bool], _id: ObjectId}
2.4总结
Nuve功能相当于一个负载均衡,它负责入库的这些信息能够更好的辅助这个功能,ch_policies决定了负载均衡算法,默认取EC队列的首位。EC服务的启动并不归Nuve管理,EC创建成功后在Nuve中入库,Nuve仅仅维护列表以及保活,负责EC的分配。
3.ErizoController
EC是一个service服务,会维护一个房间集合Rooms,创建成功后会在Nuve中进行注册,并且连接amqper
server.listen(global.config.erizoController.listen_port);
// eslint-disable-next-line global-require, import/no-extraneous-dependencies
const io = require('socket.io').listen(server, { log: false });
io.set('transports', ['websocket']);
const addECToCloudHandler = (attempt) => {
if (attempt <= 0) {
log.error('message: addECtoCloudHandler cloudHandler does not respond - fatal');
process.exit(-1);
return;
}
const controller = {
cloudProvider: global.config.cloudProvider.name,
ip: publicIP,
hostname: global.config.erizoController.hostname,
port: global.config.erizoController.port,
ssl: global.config.erizoController.ssl,
};
nuve.addNewErizoController(controller).then((msg) => {
log.info('message: succesfully added to cloudHandler');
publicIP = msg.publicIP;
myId = msg.id;
myState = 2;
startKeepAlives(myId, publicIP);
callback('callback');
}).catch((reason) => {
if (reason === 'timeout') {
log.warn('message: addECToCloudHandler cloudHandler does not respond, ' +
`attemptsLeft: ${attempt}`);
// We'll try it more!
setTimeout(() => {
attempt -= 1;
addECToCloudHandler(attempt);
}, 3000);
} else {
log.error('message: cannot contact cloudHandler');
}
});
};
addECToCloudHandler(5);
连接amqper即RPCMQ,此时会将Nuve对EC方法的调用绑定
amqper.connect(() => {
try {
rooms.on('updated', updateMyState);
amqper.setPublicRPC(rpcPublic);
addToCloudHandler(() => {
const rpcID = `erizoController_${myId}`;
amqper.bind(rpcID, listen);
});
} catch (error) {
log.info(`message: Error in Erizo Controller, ${logger.objectToLog(error)}`);
}
});
在Nuve注册成功后回调会新建一个RPCMQ消息消费队列,同时会对EC服务端口监听“connection”消息,此时会新建房间(若房间已经存在即加入反那个房间),并新建socket连接和client。
const listen = () => {
io.sockets.on('connection', (socket) => {
log.info(`message: socket connected, socketId: ${socket.id}`);
const channel = new Channel(socket, nuve);
channel.on('connected', (token, options, callback) => {
options = options || {};
try {
const room = rooms.getOrCreateRoom(myId, token.room, token.p2p);
options.singlePC = getSinglePCConfig(options.singlePC);
const client = room.createClient(channel, token, options);
log.info(`message: client connected, clientId: ${client.id}, ` +
`singlePC: ${options.singlePC}`);
if (!room.p2p && global.config.erizoController.report.session_events) {
const timeStamp = new Date();
amqper.broadcast('event', { room: room.id,
user: client.id,
type: 'user_connection',
timestamp: timeStamp.getTime() });
}
const streamList = [];
room.streamManager.forEachPublishedStream((stream) => {
streamList.push(stream.getPublicStream());
});
callback('success', { streams: streamList,
id: room.id,
clientId: client.id,
singlePC: options.singlePC,
p2p: room.p2p,
defaultVideoBW: global.config.erizoController.defaultVideoBW,
maxVideoBW: global.config.erizoController.maxVideoBW,
iceServers: global.config.erizoController.iceServers });
} catch (e) {
log.warn('message: error creating Room or Client, error:', e);
}
});
channel.on('reconnected', (clientId) => {
rooms.forEachRoom((room) => {
const client = room.getClientById(clientId);
if (client !== undefined) {
client.setNewChannel(channel);
}
});
});
socket.channel = channel;
});
};
client相当于客户端中的用户,它会根据EC新建的channel监听来自客户端的socket消息,客户端的信令消息就是在这里进行处理的。client监听到事件后会通过room.controller(即roomController)中的方法进行具体实现。
listenToSocketEvents() {
log.debug(`message: Adding listeners to socket events, client.id: ${this.id}`);
this.socketEventListeners.set('sendDataStream', this.onSendDataStream.bind(this));
this.socketEventListeners.set('connectionMessage', this.onConnectionMessage.bind(this));
this.socketEventListeners.set('streamMessage', this.onStreamMessage.bind(this));
this.socketEventListeners.set('streamMessageP2P', this.onStreamMessageP2P.bind(this));
this.socketEventListeners.set('updateStreamAttributes', this.onUpdateStreamAttributes.bind(this));
this.socketEventListeners.set('publish', this.onPublish.bind(this));
this.socketEventListeners.set('subscribe', this.onSubscribe.bind(this));
this.socketEventListeners.set('startRecorder', this.onStartRecorder.bind(this));
this.socketEventListeners.set('stopRecorder', this.onStopRecorder.bind(this));
this.socketEventListeners.set('unpublish', this.onUnpublish.bind(this));
this.socketEventListeners.set('unsubscribe', this.onUnsubscribe.bind(this));
this.socketEventListeners.set('autoSubscribe', this.onAutoSubscribe.bind(this));
this.socketEventListeners.set('getStreamStats', this.onGetStreamStats.bind(this));
this.socketEventListeners.forEach((value, key) => {
this.channel.socketOn(key, value);
});
this.channel.on('disconnect', this.onDisconnect.bind(this));
}
stopListeningToSocketEvents() {
log.debug(`message: Removing listeners to socket events, client.id: ${this.id}`);
this.socketEventListeners.forEach((value, key) => {
this.channel.socketRemoveListener(key, value);
});
}
disconnect() {
this.stopListeningToSocketEvents();
this.channel.disconnect();
}
ecCloudHandler负责EC对ErizoAgent的分配(负载均衡),amqper会对绑定到MQ上的所有“ErizoAgent”消息队列定时广播获取新建的ErizoAgent并放入ErizoAgent列表中。同时负责向ErizoAgent申请创建ErizoJS和删除ErizoJS。
that.getErizoJS = (agentId, internalId, callback) => {
let agentQueue = 'ErizoAgent';
if (getErizoAgent) {
agentQueue = getErizoAgent(agents, agentId);
}
log.info(`message: createErizoJS, agentId: ${agentQueue}`);
amqper.callRpc(agentQueue, 'createErizoJS', [internalId], { callback(resp) {
roomController中会创建一个ErizoList,用于EC这边erizo的分配以及erizo状态的维护以及保活。同时roomController会通过MQ调用ErizoJS的方法。
erizoPosition->erizo(undefine)->ecch.getErizoJS->分配Agent->Agent创建erizo
const erizos = new ErizoList(maxErizosUsedByRoom);
..........
ErizoJS保活
const sendKeepAlive = () => {
erizos.forEachUniqueErizo((erizo) => {
const erizoId = erizo.erizoId;
amqper.callRpc(`ErizoJS_${erizoId}`, 'keepAlive', [], { callback: callbackFor(erizoId) });
});
};
setInterval(sendKeepAlive, KEEPALIVE_INTERVAL);
........
通过ecch申请创建ErizoJS
getErizoJS = (callback, previousPosition = undefined) => {
let agentId;
let erizoIdForAgent;
const erizoPosition = previousPosition !== undefined ? previousPosition : currentErizo += 1;
if (waitForErizoInfoIfPending(erizoPosition, callback)) {
return;
}
const erizo = erizos.get(erizoPosition);
if (!erizo.erizoId) {
erizos.markAsPending(erizoPosition);
} else {
agentId = erizo.agentId;
erizoIdForAgent = erizo.erizoIdForAgent;
}
log.debug(`message: Getting ErizoJS, agentId: ${agentId}, ` +
`erizoIdForAgent: ${erizoIdForAgent}`);
ecch.getErizoJS(agentId, erizoIdForAgent, (gotErizoId, gotAgentId, gotErizoIdForAgent) => {
const theErizo = erizos.get(erizoPosition);
if (!theErizo.erizoId && gotErizoId !== 'timeout') {
erizos.set(erizoPosition, gotErizoId, gotAgentId, gotErizoIdForAgent);
} else if (theErizo.erizoId) {
theErizo.agentId = gotAgentId;
theErizo.erizoIdForAgent = gotErizoIdForAgent;
}
callback(gotErizoId, gotAgentId, gotErizoIdForAgent);
});
};
总结:
ErizoController是一个信令服务器,且负责erizo的负载均衡,分配erizoJS应该在哪个Agent上创建,client监听客户端socket连接,通过roomController调用erizo。
4.ErizoAgent
ErizoAgent是erizoJS的代理,代理启动后会被EC定时扫描到并存入EC的Agent列表中,然后它会被分配给不同的使用者,监听ErizoJS的创建和删除申请,erizoJS的创建就是Agent启动子进程运行erizoJS主程序(父子进程会分离异步)。
const launchErizoJS = (erizo) => {
const id = erizo.id;
log.debug(`message: launching ErizoJS, erizoId: ${id}`);
let erizoProcess; let out; let
err;
const erizoLaunchOptions = ['./../erizoJS/erizoJS.js', id, privateIP, publicIP];
if (global.config.erizoAgent.launchDebugErizoJS) {
erizoLaunchOptions.push('-d');
}
if (global.config.erizoAgent.useIndividualLogFiles) {
out = fs.openSync(`${global.config.erizoAgent.instanceLogDir}/erizo-${id}.log`, 'a');
err = fs.openSync(`${global.config.erizoAgent.instanceLogDir}/erizo-${id}.log`, 'a');
erizoProcess = spawn(LAUNCH_SCRIPT, erizoLaunchOptions,
{ detached: true, stdio: ['ignore', out, err] });
} else {
erizoProcess = spawn(LAUNCH_SCRIPT, erizoLaunchOptions,
{ detached: true, stdio: ['ignore', 'pipe', 'pipe'] });
erizoProcess.stdout.setEncoding('utf8');
erizoProcess.stdout.on('data', (message) => {
printErizoLogMessage(`[erizo-${id}]`, message.replace(/\n$/, ''));
});
erizoProcess.stderr.setEncoding('utf8');
erizoProcess.stderr.on('data', (message) => {
printErizoLogMessage(`[erizo-${id}]`, message.replace(/\n$/, ''));
});
}
erizoProcess.unref();
erizoProcess.on('close', () => {
log.info(`message: closed, erizoId: ${id}`);
erizos.delete(id);
if (out !== undefined) {
fs.close(out, (message) => {
if (message) {
log.error('message: error closing log file, ',
`erizoId: ${id}`, 'error:', message);
}
});
}
if (err !== undefined) {
fs.close(err, (message) => {
if (message) {
log.error('message: error closing log file, ',
`erizoId: ${id}`, 'error:', message);
}
});
}
erizos.fill();
});
log.info(`message: launched new ErizoJS, erizoId: ${id}`);
// eslint-disable-next-line no-param-reassign
erizo.process = erizoProcess;
};
5.ErizoJS
在erizoJS中client依然是与用户一一对应,与EC中的操作是异步的,只有在开始推流即addPublisher的时候才会将对应的client创建,并为对应的stream建立connection,在非singlePC模式下,每一个stream都会与客户端建立一个流连接,每一个输入流是一个Publisher,订阅者是Subscriber,一个client会有多个Publisher,而每个Publisher会有多个Subscriber,每个Publisher都会有一个媒体转发器OneToManyProcessor。底层的C++ erizo实现了 stream connection的建立,以及媒体转发和带宽控制。
/*
* Adds a publisher to the room. This creates a new OneToManyProcessor
* and a new WebRtcConnection. This WebRtcConnection will be the publisher
* of the OneToManyProcessor.
*/
that.addPublisher = (erizoControllerId, clientId, streamId, options, callbackRpc) => {
updateUptimeInfo();
let publisher;
log.info('addPublisher, clientId', clientId, 'streamId', streamId);
const client = getOrCreateClient(erizoControllerId, clientId, options.singlePC);
if (publishers[streamId] === undefined) {
// eslint-disable-next-line no-param-reassign
options.publicIP = that.publicIP;
// eslint-disable-next-line no-param-reassign
options.privateRegexp = that.privateRegexp;
//新建connection的时候会在C++层建立WebRtcConnection,附带的有媒体设置消息,
//同时会创建mediaStream,同时维护一个mediaStream字典
const connection = client.getOrCreateConnection(options);
log.info('message: Adding publisher, ' +
`clientId: ${clientId}, ` +
`streamId: ${streamId}`,
logger.objectToLog(options),
logger.objectToLog(options.metadata));
publisher = new Publisher(clientId, streamId, connection, options);
publishers[streamId] = publisher;
publisher.initMediaStream();
publisher.on('callback', onAdaptSchemeNotify.bind(this, callbackRpc));
publisher.on('periodic_stats', onPeriodicStats.bind(this, streamId, undefined));
publisher.promise.then(() => {
//更新connection状态并初始化,options.createOffer=true就会创建offer,
//并且在connection状态到达onInitialized/onGathered的时候发送offer
connection.init(options.createOffer);
});
connection.onInitialized.then(() => {
callbackRpc('callback', { type: 'initializing', connectionId: connection.id });
});
connection.onReady.then(() => {
callbackRpc('callback', { type: 'ready' });
});
connection.onStarted.then(() => {
callbackRpc('callback', { type: 'started' });
});
if (options.createOffer) {
let onEvent;
if (options.trickleIce) {
onEvent = connection.onInitialized;
} else {
onEvent = connection.onGathered;
}
onEvent.then(() => {
connection.sendOffer();
});
}
} else {
publisher = publishers[streamId];
if (publisher.numSubscribers === 0) {
log.warn('message: publisher already set but no subscribers will ignore, ' +
`code: ${WARN_CONFLICT}, streamId: ${streamId}`,
logger.objectToLog(options.metadata));
} else {
log.warn('message: publisher already set has subscribers will ignore, ' +
`code: ${WARN_CONFLICT}, streamId: ${streamId}`);
}
}
};
6.时序图
(未完待续)