【问题标题】:Using cluster in a Node module在 Node 模块中使用集群
【发布时间】:2014-06-29 02:42:47
【问题描述】:

更新:即使这种特殊情况不现实,根据 cmets,我仍然对如何编写一个使用集群的模块而不用每次都重新运行父进程感兴趣。


我正在尝试编写一个名为 mass-request 的 Node.js 模块,该模块通过将大量 HTTP 请求分发给子进程来加速它们。

我希望,在外面,它是这样工作的。

var mr = require("mass-request"),
    scraper = mr();

for (var i = 0; i < my_urls_to_visit.length; i += 1) {
    scraper.add(my_urls_to_visit[i], function(resp) {
        // do something with response
    }
}

首先,我为批量请求模块组装了一个框架。

var cluster = require("cluster"),
    numCPUs = require("os").cpus().length;

module.exports = function() {
    console.log("hello from mass-request!");
    if (cluster.isMaster) {
        for (var i = 0; i < numCPUs; i += 1) {
            var worker = cluster.fork();             
        }

        return {
            add: function(url, cb) {}       
        }       
    } else {
        console.log("worker " + process.pid + " is born!");
    }  
}

然后我在测试脚本中像这样测试它:

var m = mr();
console.log("hello from test.js!", m);

我希望看到“来自大量请求的您好!”记录了四次(确实如此)。令我惊讶的是,我还看到了四次“来自 test.js 的你好”。显然我不明白cluster.fork() 是如何工作的。它是重新运行整个过程,而不仅仅是第一次调用它的函数吗?

如果是这样,如何在一个模块中使用集群而不用混乱的多进程逻辑困扰使用该模块的人?

【问题讨论】:

  • 在子进程 js 线程中运行请求有什么帮助? Http 请求已经存在于 js 线程之外。见nodejs.org/api/http.html#http_class_http_agent
  • 有趣。那么让两个或多个进程划分许多 URL 调用的工作不会加快进程吗?一个线程进行所有调用,另一个线程处理响应呢?
  • 使用多个 js 线程的唯一原因是 js 线程是否是瓶颈。鉴于 node.js 的异步特性,当 io 也在图片中时,这种情况很少见。因此,只有在您进行 cpu 密集型工作(例如加密)时,将处理分叉给孩子才有意义。 Mozilla 角色就是一个很好的例子。
  • 请注意,传统上,fork() 会创建整个过程的副本。它不会“重新运行”整个进程,但会复制内存,并且从 fork() 的返回开始在两个进程中继续执行。
  • @generalhenry 您提供的链接并不能真正支持您最初的说法,即 http 的东西在与主线程不同的线程上运行,您是否有更多信息?

标签: multithreading node.js cluster-computing


【解决方案1】:

我相信您正在寻找的是setupMaster

来自文档:

cluster.setupMaster([settings])

  • 设置对象
    • exec String 工作文件的文件路径。 (默认=process.argv[1])
    • args 传递给 worker 的数组字符串参数。 (默认=process.argv.slice(2))
    • silent 布尔值是否将输出发送到父级的 stdio。 (默认=false)

setupMaster 用于更改默认的“分叉”行为。调用后,设置将出现在 cluster.settings 中

通过使用 exec 属性,您可以让您的工作人员从不同的模块启动。

重要提示: 如文档所述,这只能调用一次。如果你的模块依赖于这种行为,那么调用者就不能使用cluster,否则整个事情就会崩溃。

例如:

index.js

var cluster = require("cluster"),
  path = require("path"),
  numCPUs = require("os").cpus().length;

console.log("hello from mass-request!");
if (cluster.isMaster) {
  cluster.setupMaster({
    exec: path.join(__dirname, 'worker.js')
  });

  for (var i = 0; i < numCPUs; i += 1) {
    var worker = cluster.fork();
  }

  return {
    add: function (url, cb) {
    }
  }
} else {
  console.log("worker " + process.pid + " is born!");
}

worker.js

console.log("worker " + process.pid + " is born!");

输出

node index.js 
hello from mass-request!
worker 38821 is born!
worker 38820 is born!
worker 38822 is born!
worker 38819 is born!

【讨论】:

  • 那行“return{add: function(url,cb){}}”在做什么?
  • 另外,集群中的worker使用这个功能真的可以发送HTTP响应吗?
【解决方案2】:

虽然 node.js 的异步特性确实使它很棒,但它仍然在单个事件循环中的服务器上的单个线程中运行。使用集群对 node.js 应用程序进行多线程允许您将应用程序的子进程分叉到它们自己的线程中,从而使您能够更好地利用多核服务器。不久前,我构建了一个游戏服务器架构,它使用集群和 zmq (ZeroMQ) 来实现多线程,并使进程能够轻松地通过各种渠道来回发送消息。我已将该架构简化为下面的示例,希望有助于说明如何将多线程 node.js 组合在一起。如果有点粗略,我很抱歉,那是几年前的事了,当时我对节点还比较陌生;)

理想情况下,您不想将主/子的所有内容嵌套在一个脚本中,但我认为这是让您复制/粘贴/运行的最简单方法:)

正如您在评论中提到的,我给出了一个很好的集群示例,但就调度所有内容而言,它并不适合您的特定用例。我没有太多时间,所以我调整了我的示例,使其能够很快地满足您的需求。试一试:

ma​​ss-request.js

var cluster = require('cluster');
var zmq = require('zmq');

module.exports = {
    _childId : null,
    _urls : [],
    _threadCount : 1,
    _readyThreads : 0,
    _callbacks : {},
    zmqReceive : null, //the socket we receive on for this thread
    zmqMaster : null, //the socket to the master
    zmqChildren : {}, //an object storing the sockets for the children
    setThreads : function( threadCount ) {
        this._threadCount = threadCount;
    },
    add : function( url , cb ) {
        this._urls.push( {url: url, cb : cb } );
    },
    run : function() {

        if( cluster.isMaster ) {

            this._masterThread();

        } else {

            this._childThread();

        }

    },
    _masterThread : function() {

        console.log( 'Master Process Starting Up' );

        this.zmqReceive = zmq.socket('pull').bindSync( 'ipc://master.ipc' );

        //bind handler for messages coming into this process using closure to allow us to access the massrequest object inside the callback
        ( function( massRequest ) {
            this.zmqReceive.on( 'message' , function( msg ) {

                msg = JSON.parse(msg);

                //was this an online notification?
                if( msg && msg.status == 'Online' ) {
                    massRequest._threadReady();
                    return; //we're done
                }
                if( msg && msg.html ) {
                    //this was a response from a child, call the callback for it
                    massRequest._callbacks[ msg.sender ].call( massRequest , msg.html );
                    //send the child another URL
                    massRequest._sendUrlToChild( msg.sender );
                }

            } );
        }).call( this , this );

        //fork 4 child processes and set up the sending sockets for them
        for( var i=0; i < this._threadCount; ++i ) {
            //set up the sending socket
            this.zmqChildren[i] = zmq.socket('push').connect( 'ipc://child_' + i + '.ipc' );
            //fork the process and pass it an id
            cluster.fork( {
                _childId:i
            } );
        }

    },
    _sendUrlToChild : function( child ) {
        //if there's no urls left, return (this would also be a good place to send a message to the child to exit gracefully)
        if( !this._urls.length ) return;
        //grab a url to process
        var item = this._urls.pop();
        //set the callback for the child
        this._callbacks[child] = item.cb;
        this.zmqChildren[child].send( JSON.stringify( { url:item.url } ) );
    },
    _processUrls : function() {
        for( var i=0; i < this._threadCount; ++i ) {
            this._sendUrlToChild( i );
        }
    },
    _threadReady : function() {
        if( ++this._readyThreads >= this._threadCount ) {
            //all threads are ready, send out urls to start the mayhem
            console.log( 'All threads online, starting URL processing' );
            this._processUrls();
        }
    },
    _childProcessUrl : function( url ) {
        console.log( 'Child Process ' + this.childId + ' Handling URL: ' + url );
        //do something here to scrape your content however you see fit
        var html = 'HTML';
        this.zmqMaster.send( JSON.stringify( { sender:this.childId, html:html } ) );
    },
    _childThread : function() {

        //get the child id that was passed from cluster
        this.childId = process.env._childId;

        console.log( 'Child Process ' + this.childId + ' Starting Up' );

        //bind the pull socket to receive messages to this process
        this.zmqReceive = zmq.socket('pull').bindSync( 'ipc://child_' + this.childId + '.ipc' );

        //bind the push socket to send to the master
        this.zmqMaster = zmq.socket('push').connect('ipc://master.ipc');

        //bind handler for messages coming into this process
        ( function( massRequest ) {
            this.zmqReceive.on( 'message' , function( msg ) {

                msg = JSON.parse(msg);

                console.log( 'Child ' + this.childId + ': ' + msg );

                //handle the url
                if( msg && msg.url ) massRequest._childProcessUrl( msg.url );

            } );
        }).call( this , this );

        //let the master know we're done setting up
        this.zmqMaster.send( JSON.stringify({sender:this.childId,status:'Online'}) );

    },
}

demo.js

var mr = require( './mass-request.js' );
mr.setThreads( 4 );
mr.add( 'http://foo.com' , function( resp ) {
    console.log( 'http://foo.com is done' );
} );
mr.add( 'http://bar.com' , function( resp ) {
    console.log( 'http://bar.com is done' );
} );
mr.add( 'http://alpha.com' , function( resp ) {
    console.log( 'http://alpha.com is done' );
} );
mr.add( 'http://beta.com' , function( resp ) {
    console.log( 'http://beta.com is done' );
} );
mr.add( 'http://theta.com' , function( resp ) {
    console.log( 'http://theta.com is done' );
} );
mr.add( 'http://apples.com' , function( resp ) {
    console.log( 'http://apples.com is done' );
} );
mr.add( 'http://oranges.com' , function( resp ) {
    console.log( 'http://oranges.com is done' );
} );
mr.run();

将它们放在同一个文件夹中并运行node demo.js

我还应该指出,由于它的基础是从我使用 [0MQ][http://zeromq.org/] 的其他项目之一中提取的,因此您需要将它与 [node.js 模块一起安装][@987654322 @npm install zmq 显然是集群模块。当然,您可以将 ZMQ 部件换成您想要的任何其他进程间通信方法。这恰好是我熟悉并使用过的一种。

简要概述:主线程也称为调用 run() 方法的脚本将启动 X 个子线程(可以通过调用 setThreads 来设置)。这些孩子在完成初始化后通过 ZeroMQ 套接字向主线程报告。一旦所有线程都准备好,主脚本将 url 分派给孩子,以便他们可以运行并获取 HTML。它们将 HTML 返回给主脚本,主脚本将其传递给该 URL 的适当回调函数,然后将另一个 URL 分派给子脚本。虽然这不是一个完美的解决方案,但回调函数仍然会成为主(主)线程中的瓶颈,因为您不能轻易地将它们移到另一个线程。这些回调可能包含闭包/变量/等,如果没有某种对象共享机制,它们可能无法在父线程之外正常工作。

任何人,如果你在这里启动我的小演示,你会看到 4 个线程“处理” url(为简单起见,它们实际上并没有加载 url)。

希望对您有所帮助;)

【讨论】:

  • 谢谢!这是聚类的一个很好的例子。不过,这不是我的问题。这是如何在不同脚本所需的模块中执行此类操作,以便父脚本不必担心检查它是否是主脚本。请参阅原始帖子中的代码示例。干杯!
  • @ChrisWilson 是的,先生。我在我不得不这样做的很短的时间内尽可能地根据您的需要调整了原始示例:X 这是一个工作演示,我认为它很好地满足了您想要做的事情;)demo.js 文件是什么您的模块的用户基本上必须做,其他一切都按要求在后台运行。
  • 我希望我能和你分享赏金,因为我从你深思熟虑的回应中学到了很多。谢谢!
  • 别担心:P 我不是为了赏金;)我很高兴你从中获得了一些有用的信息。
  • 我看不出这与直接使用集群模块有何不同?
猜你喜欢
  • 2015-05-17
  • 1970-01-01
  • 2018-05-19
  • 1970-01-01
  • 2014-06-01
  • 1970-01-01
  • 2020-06-23
  • 2021-03-28
  • 2019-01-07
相关资源
最近更新 更多