【问题标题】:PHP popen process limit?PHP popen进程限制?
【发布时间】:2017-09-06 08:41:34
【问题描述】:

我正在尝试将耗时的任务提取到单独的进程中。不幸的是,多线程似乎并不是 PHP 的一个选项,但您可以使用 popen 创建新的 php 进程。

用例是这样的:有一个每分钟运行的 cronjob,它检查是否有任何电子邮件活动需要发送。可能需要同时发送多个广告系列,但到目前为止,它每分钟只接收一个广告系列。我想将广告系列的发送提取到一个单独的进程中,以便我可以同时发送多个广告系列。

代码看起来像这样(请注意,这只是一个概念证明):

crontab

* * * * * root /usr/local/bin/php /var/www/maintask.php 2>&1

ma​​intask.php

for ($i = 0; $i < 4; $i++) {
    $processName = "Process_{$i}";
    echo "Spawn process {$processName}" . PHP_EOL;

    $process = popen("php subtask.php?process_name={$processName} 2>&1", "r");
    stream_set_blocking($process, false);
}

subtask.php

$process = $_GET['process_name'];

echo "Started sleeping process {$process}" . PHP_EOL;
sleep(rand(10, 40));
echo "Stopped sleeping process  {$process}" . PHP_EOL;

现在,我遇到的问题是 popen 在任何时候都只会产生 2 个进程,而我试图产生 4 个。我不知道为什么。似乎没有记录任何限制。也许这受到我可用内核数量的限制?

【问题讨论】:

  • 我过去曾成功使用proc_open() 创建多个子进程。我看看能不能挖出一段代码
  • 您的问题解决了吗?你怎么知道你的代码同时只运行 2 个进程?您帖子中的代码不足以对此进行测试。
  • @WeeZel 不幸的是我还没有解决这个问题。我通过使用“ps -aux”监视活动进程来检查只有 2 个正在运行的进程。我可以看到,一旦 subtask.php 进程之一完成,就会启动一个新进程。我想您对提供的代码是正确的。这是试图简单地展示我想要实现的目标,而不发布所有会使事情变得有点过于复杂的实际代码。我会看看我是否可以对我的问题进行工作演示并更新原始帖子。谢谢!
  • 我知道这不是您问题的答案,但您想试试我使用proc_open() 编写的用于运行并行进程的类吗? (我用它来获取whois 数据,其中每个任务可能需要不同的时间才能返回)
  • 我绝对可以试一试!不幸的是,php 文档没有明确说明 popen 和 proc_open 之间的区别,只是 proc_open 提供了更大程度的控制。可能值得一试!

标签: php cron limit popen


【解决方案1】:

我修改了subtask.php,这样您就可以看到每个任务何时开始、何时结束以及它打算等待多长时间。现在您可以看到进程何时开始/停止,您可以减少睡眠时间 - 无需使用 ps -aux 来显示进程何时运行

subtask.php

<?php
$process = $argv[1];

$sleepTime = rand(1, 10);
echo date('Y-m-d H:i:s') . " - Started sleeping process {$process} ({$sleepTime})" . PHP_EOL;
sleep($sleepTime);
echo date('Y-m-d H:i:s') . " - Stopped sleeping process {$process}" . PHP_EOL;

我已将 Class 添加到 maintask.php 代码中,以便您可以对其进行测试...当您 queue() 的条目数超过您设置的 maxProcesses(尝试 32)时,乐趣就开始了(尝试 32)
注意:结果将按照完成的顺序返回

ma​​intask.php

<?php
class ParallelProcess
{
    private $maxProcesses = 16; // maximum processes
    private $arrProcessQueue = [];
    private $arrCommandQueue = [];

    private function __construct()
    {
    }

    private function __clone()
    {
    }

    /**
     *
     * @return \static
     */
    public static function create()
    {
        $result = new static();
        return $result;
    }

    /**
     *
     * @param int $maxProcesses
     * @return \static
     */
    public static function load($maxProcesses = 16)
    {
        $result = self::create();
        $result->setMaxProcesses($maxProcesses);
        return $result;
    }

    /**
     * get maximum processes
     *
     * @return int
     */
    public function getMaxProcesses()
    {
        return $this->maxProcesses;
    }

    /**
     * set maximum processes
     *
     * @param int $maxProcesses
     * @return $this
     */
    public function setMaxProcesses($maxProcesses)
    {
        $this->maxProcesses = $maxProcesses;
        return $this;
    }

    /**
     * number of entries in the process queue
     *
     * @return int
     */
    public function processQueueLength()
    {
        $result = count($this->arrProcessQueue);
        return $result;
    }

    /**
     * number of entries in the command queue
     *
     * @return int
     */
    public function commandQueueLength()
    {
        $result = count($this->arrCommandQueue);
        return $result;
    }


    /**
     * process open
     *
     * @staticvar array $arrDescriptorspec
     * @param string $strCommand
     * @return $this
     * @throws \Exception
     */
    private function p_open($strCommand)
    {
        static $arrDescriptorSpec = array(
            0 => array('file', '/dev/null', 'r'), // stdin is a file that the child will reda from
            1 => array('pipe', 'w'), // stdout is a pipe that the child will write to
            2 => array('file', '/dev/null', 'w') // stderr is a pipe that the child will write to
        );

        $arrPipes = array();
        if (($resProcess = proc_open($strCommand, $arrDescriptorSpec, $arrPipes)) === false) {
            throw new \Exception("error: proc_open() failed!");
        }

        $resStream = &$arrPipes[1];

        if (($blnSetBlockingResult = stream_set_blocking($resStream, true)) === false) {
            throw new \Exception("error: stream_set_blocking() failed!");
        }

        $this->arrProcessQueue[] = array(&$strCommand, &$resProcess, &$resStream);
        return $this;
    }

    /**
     * execute any queued commands
     *
     * @return $this
     */
    private function executeCommand()
    {
        while ($this->processQueueLength() < $this->maxProcesses and $this->commandQueueLength() > 0) {
            $strCommand = array_shift($this->arrCommandQueue);
            $this->p_open($strCommand);
        }
        return $this;
    }

    /**
     * process close
     *
     * @param array $arrQueueEntry
     * @return $this
     */
    private function p_close(array $arrQueueEntry)
    {
        $resProcess = $arrQueueEntry[1];
        $resStream = $arrQueueEntry[2];

        fclose($resStream);

        $this->returnValue = proc_close($resProcess);

        $this->executeCommand();
        return $this;
    }

    /**
     * queue command
     *
     * @param string $strCommand
     * @return $this
     */
    public function queue($strCommand) {
        // put the command on the $arrCommandQueue
        $this->arrCommandQueue[] = $strCommand;
        $this->executeCommand();
        return $this;
    }

    /**
     * read from stream
     *
     * @param resource $resStream
     * @return string
     */
    private static function readStream($resStream)
    {
        $result = '';
        while (($line = fgets($resStream)) !== false) {
            $result .= $line;
        }
        return $result;
    }

    /**
     * read a result from the process queue
     *
     * @return string|false
     */
    private function readProcessQueue()
    {
        $result = false;
        reset($this->arrProcessQueue);
        while ($result === false && list($key, $arrQueueEntry) = each($this->arrProcessQueue)) {
            $arrStatus = proc_get_status($arrQueueEntry[1]);
            if ($arrStatus['running'] === false) {
                array_splice($this->arrProcessQueue, $key, 1);
                $resStream = $arrQueueEntry[2];
                $result = self::readStream($resStream);
                $this->p_close($arrQueueEntry);
            }
        }
        return $result;
    }

    /**
     * get result from process queue
     *
     * @return string|false
     */
    public function readNext()
    {
        $result = false;
        if ($this->processQueueLength() === 0) {
        } else {
            while ($result === false and $this->processQueueLength() > 0) {
                $result = $this->readProcessQueue();
            }
        }
        return $result;
    }
}

set_time_limit(0); // don't timeout

$objParallelProcess = ParallelProcess::load(8); // allow up to 8 parallel processes

for ($i = 0; $i < 4; $i++) {
    $processName = "Process_{$i}";
    echo date('Y-m-d H:i:s') . " - Queue process {$processName}" . PHP_EOL;
    $objParallelProcess->queue("php subtask.php {$processName}"); // queue process
}

// loop through process queue
while (($strResponse = $objParallelProcess->readNext()) !== false) { // read next result and run next command if one is queued
    // process response
    echo $strResponse;
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-04-02
    • 2018-12-05
    • 2011-01-13
    • 2023-03-09
    相关资源
    最近更新 更多