我修改了subtask.php,这样您就可以看到每个任务何时开始、何时结束以及它打算等待多长时间。现在您可以看到进程何时开始/停止,您可以减少睡眠时间 - 无需使用 ps -aux 来显示进程何时运行
subtask.php
<?php
$process = $argv[1];
$sleepTime = rand(1, 10);
echo date('Y-m-d H:i:s') . " - Started sleeping process {$process} ({$sleepTime})" . PHP_EOL;
sleep($sleepTime);
echo date('Y-m-d H:i:s') . " - Stopped sleeping process {$process}" . PHP_EOL;
我已将 Class 添加到 maintask.php 代码中,以便您可以对其进行测试...当您 queue() 的条目数超过您设置的 maxProcesses(尝试 32)时,乐趣就开始了(尝试 32)
注意:结果将按照完成的顺序返回
maintask.php
<?php
class ParallelProcess
{
private $maxProcesses = 16; // maximum processes
private $arrProcessQueue = [];
private $arrCommandQueue = [];
private function __construct()
{
}
private function __clone()
{
}
/**
*
* @return \static
*/
public static function create()
{
$result = new static();
return $result;
}
/**
*
* @param int $maxProcesses
* @return \static
*/
public static function load($maxProcesses = 16)
{
$result = self::create();
$result->setMaxProcesses($maxProcesses);
return $result;
}
/**
* get maximum processes
*
* @return int
*/
public function getMaxProcesses()
{
return $this->maxProcesses;
}
/**
* set maximum processes
*
* @param int $maxProcesses
* @return $this
*/
public function setMaxProcesses($maxProcesses)
{
$this->maxProcesses = $maxProcesses;
return $this;
}
/**
* number of entries in the process queue
*
* @return int
*/
public function processQueueLength()
{
$result = count($this->arrProcessQueue);
return $result;
}
/**
* number of entries in the command queue
*
* @return int
*/
public function commandQueueLength()
{
$result = count($this->arrCommandQueue);
return $result;
}
/**
* process open
*
* @staticvar array $arrDescriptorspec
* @param string $strCommand
* @return $this
* @throws \Exception
*/
private function p_open($strCommand)
{
static $arrDescriptorSpec = array(
0 => array('file', '/dev/null', 'r'), // stdin is a file that the child will reda from
1 => array('pipe', 'w'), // stdout is a pipe that the child will write to
2 => array('file', '/dev/null', 'w') // stderr is a pipe that the child will write to
);
$arrPipes = array();
if (($resProcess = proc_open($strCommand, $arrDescriptorSpec, $arrPipes)) === false) {
throw new \Exception("error: proc_open() failed!");
}
$resStream = &$arrPipes[1];
if (($blnSetBlockingResult = stream_set_blocking($resStream, true)) === false) {
throw new \Exception("error: stream_set_blocking() failed!");
}
$this->arrProcessQueue[] = array(&$strCommand, &$resProcess, &$resStream);
return $this;
}
/**
* execute any queued commands
*
* @return $this
*/
private function executeCommand()
{
while ($this->processQueueLength() < $this->maxProcesses and $this->commandQueueLength() > 0) {
$strCommand = array_shift($this->arrCommandQueue);
$this->p_open($strCommand);
}
return $this;
}
/**
* process close
*
* @param array $arrQueueEntry
* @return $this
*/
private function p_close(array $arrQueueEntry)
{
$resProcess = $arrQueueEntry[1];
$resStream = $arrQueueEntry[2];
fclose($resStream);
$this->returnValue = proc_close($resProcess);
$this->executeCommand();
return $this;
}
/**
* queue command
*
* @param string $strCommand
* @return $this
*/
public function queue($strCommand) {
// put the command on the $arrCommandQueue
$this->arrCommandQueue[] = $strCommand;
$this->executeCommand();
return $this;
}
/**
* read from stream
*
* @param resource $resStream
* @return string
*/
private static function readStream($resStream)
{
$result = '';
while (($line = fgets($resStream)) !== false) {
$result .= $line;
}
return $result;
}
/**
* read a result from the process queue
*
* @return string|false
*/
private function readProcessQueue()
{
$result = false;
reset($this->arrProcessQueue);
while ($result === false && list($key, $arrQueueEntry) = each($this->arrProcessQueue)) {
$arrStatus = proc_get_status($arrQueueEntry[1]);
if ($arrStatus['running'] === false) {
array_splice($this->arrProcessQueue, $key, 1);
$resStream = $arrQueueEntry[2];
$result = self::readStream($resStream);
$this->p_close($arrQueueEntry);
}
}
return $result;
}
/**
* get result from process queue
*
* @return string|false
*/
public function readNext()
{
$result = false;
if ($this->processQueueLength() === 0) {
} else {
while ($result === false and $this->processQueueLength() > 0) {
$result = $this->readProcessQueue();
}
}
return $result;
}
}
set_time_limit(0); // don't timeout
$objParallelProcess = ParallelProcess::load(8); // allow up to 8 parallel processes
for ($i = 0; $i < 4; $i++) {
$processName = "Process_{$i}";
echo date('Y-m-d H:i:s') . " - Queue process {$processName}" . PHP_EOL;
$objParallelProcess->queue("php subtask.php {$processName}"); // queue process
}
// loop through process queue
while (($strResponse = $objParallelProcess->readNext()) !== false) { // read next result and run next command if one is queued
// process response
echo $strResponse;
}