虽然 node.js 的异步特性确实使它很棒,但它仍然在单个事件循环中的服务器上的单个线程中运行。使用集群对 node.js 应用程序进行多线程允许您将应用程序的子进程分叉到它们自己的线程中,从而使您能够更好地利用多核服务器。不久前,我构建了一个游戏服务器架构,它使用集群和 zmq (ZeroMQ) 来实现多线程,并使进程能够轻松地通过各种渠道来回发送消息。我已将该架构简化为下面的示例,希望有助于说明如何将多线程 node.js 组合在一起。如果有点粗略,我很抱歉,那是几年前的事了,当时我对节点还比较陌生;)
理想情况下,您不想将主/子的所有内容嵌套在一个脚本中,但我认为这是让您复制/粘贴/运行的最简单方法:)
正如您在评论中提到的,我给出了一个很好的集群示例,但就调度所有内容而言,它并不适合您的特定用例。我没有太多时间,所以我调整了我的示例,使其能够很快地满足您的需求。试一试:
mass-request.js
var cluster = require('cluster');
var zmq = require('zmq');
module.exports = {
_childId : null,
_urls : [],
_threadCount : 1,
_readyThreads : 0,
_callbacks : {},
zmqReceive : null, //the socket we receive on for this thread
zmqMaster : null, //the socket to the master
zmqChildren : {}, //an object storing the sockets for the children
setThreads : function( threadCount ) {
this._threadCount = threadCount;
},
add : function( url , cb ) {
this._urls.push( {url: url, cb : cb } );
},
run : function() {
if( cluster.isMaster ) {
this._masterThread();
} else {
this._childThread();
}
},
_masterThread : function() {
console.log( 'Master Process Starting Up' );
this.zmqReceive = zmq.socket('pull').bindSync( 'ipc://master.ipc' );
//bind handler for messages coming into this process using closure to allow us to access the massrequest object inside the callback
( function( massRequest ) {
this.zmqReceive.on( 'message' , function( msg ) {
msg = JSON.parse(msg);
//was this an online notification?
if( msg && msg.status == 'Online' ) {
massRequest._threadReady();
return; //we're done
}
if( msg && msg.html ) {
//this was a response from a child, call the callback for it
massRequest._callbacks[ msg.sender ].call( massRequest , msg.html );
//send the child another URL
massRequest._sendUrlToChild( msg.sender );
}
} );
}).call( this , this );
//fork 4 child processes and set up the sending sockets for them
for( var i=0; i < this._threadCount; ++i ) {
//set up the sending socket
this.zmqChildren[i] = zmq.socket('push').connect( 'ipc://child_' + i + '.ipc' );
//fork the process and pass it an id
cluster.fork( {
_childId:i
} );
}
},
_sendUrlToChild : function( child ) {
//if there's no urls left, return (this would also be a good place to send a message to the child to exit gracefully)
if( !this._urls.length ) return;
//grab a url to process
var item = this._urls.pop();
//set the callback for the child
this._callbacks[child] = item.cb;
this.zmqChildren[child].send( JSON.stringify( { url:item.url } ) );
},
_processUrls : function() {
for( var i=0; i < this._threadCount; ++i ) {
this._sendUrlToChild( i );
}
},
_threadReady : function() {
if( ++this._readyThreads >= this._threadCount ) {
//all threads are ready, send out urls to start the mayhem
console.log( 'All threads online, starting URL processing' );
this._processUrls();
}
},
_childProcessUrl : function( url ) {
console.log( 'Child Process ' + this.childId + ' Handling URL: ' + url );
//do something here to scrape your content however you see fit
var html = 'HTML';
this.zmqMaster.send( JSON.stringify( { sender:this.childId, html:html } ) );
},
_childThread : function() {
//get the child id that was passed from cluster
this.childId = process.env._childId;
console.log( 'Child Process ' + this.childId + ' Starting Up' );
//bind the pull socket to receive messages to this process
this.zmqReceive = zmq.socket('pull').bindSync( 'ipc://child_' + this.childId + '.ipc' );
//bind the push socket to send to the master
this.zmqMaster = zmq.socket('push').connect('ipc://master.ipc');
//bind handler for messages coming into this process
( function( massRequest ) {
this.zmqReceive.on( 'message' , function( msg ) {
msg = JSON.parse(msg);
console.log( 'Child ' + this.childId + ': ' + msg );
//handle the url
if( msg && msg.url ) massRequest._childProcessUrl( msg.url );
} );
}).call( this , this );
//let the master know we're done setting up
this.zmqMaster.send( JSON.stringify({sender:this.childId,status:'Online'}) );
},
}
demo.js
var mr = require( './mass-request.js' );
mr.setThreads( 4 );
mr.add( 'http://foo.com' , function( resp ) {
console.log( 'http://foo.com is done' );
} );
mr.add( 'http://bar.com' , function( resp ) {
console.log( 'http://bar.com is done' );
} );
mr.add( 'http://alpha.com' , function( resp ) {
console.log( 'http://alpha.com is done' );
} );
mr.add( 'http://beta.com' , function( resp ) {
console.log( 'http://beta.com is done' );
} );
mr.add( 'http://theta.com' , function( resp ) {
console.log( 'http://theta.com is done' );
} );
mr.add( 'http://apples.com' , function( resp ) {
console.log( 'http://apples.com is done' );
} );
mr.add( 'http://oranges.com' , function( resp ) {
console.log( 'http://oranges.com is done' );
} );
mr.run();
将它们放在同一个文件夹中并运行node demo.js。
我还应该指出,由于它的基础是从我使用 [0MQ][http://zeromq.org/] 的其他项目之一中提取的,因此您需要将它与 [node.js 模块一起安装][@987654322 @npm install zmq 显然是集群模块。当然,您可以将 ZMQ 部件换成您想要的任何其他进程间通信方法。这恰好是我熟悉并使用过的一种。
简要概述:主线程也称为调用 run() 方法的脚本将启动 X 个子线程(可以通过调用 setThreads 来设置)。这些孩子在完成初始化后通过 ZeroMQ 套接字向主线程报告。一旦所有线程都准备好,主脚本将 url 分派给孩子,以便他们可以运行并获取 HTML。它们将 HTML 返回给主脚本,主脚本将其传递给该 URL 的适当回调函数,然后将另一个 URL 分派给子脚本。虽然这不是一个完美的解决方案,但回调函数仍然会成为主(主)线程中的瓶颈,因为您不能轻易地将它们移到另一个线程。这些回调可能包含闭包/变量/等,如果没有某种对象共享机制,它们可能无法在父线程之外正常工作。
任何人,如果你在这里启动我的小演示,你会看到 4 个线程“处理” url(为简单起见,它们实际上并没有加载 url)。
希望对您有所帮助;)