【问题标题】:PHP pthreads: Add a new object from an "Threaded" to the poolPHP pthreads:将“线程”中的新对象添加到池中
【发布时间】:2017-02-03 23:16:13
【问题描述】:

我在玩 pthreads 3.1.6-dev 和 PHP 7.1。我的目标是创建一个小型网络爬虫。

计划的工作流程是:您将一个 URL 放入池中(可能是主页),然后一个爬虫(扩展 Threaded)从该 URL 中捕获所有链接。经过小过滤后,爬虫应将所有新链接添加到池中(不应将外部链接添加到池中)。或者,爬虫将新的网址提供给“其他人”,然后将其添加到池中。

该过程应继续进行,直到找不到新的 URL。

我的问题是我没有找到可行的解决方案。我当前的绘制如下所示:爬虫提取 url 并将其放入池中。为此,每个 Worker 都有一个对池的引用,以便爬虫可以通过 Worker 访问池对象。

这个解决方案的问题:如果一个“迟到的”线程添加一个新线程到池中,这个新任务将不会执行。

一些演示代码:

class WebWorker extends Worker
{
    public function __construct(WebPool $pool)
    {
        $this->pool = $pool;

        print_r("Create a new worker\n");
    }

    public function run()
    {
        print_r("Worker {$this->getThreadId()}: " . __METHOD__ . "\n");
    }

    public function getPool()
    {
        return $this->pool;
    }

    private $pool;
}

class WebWork extends Threaded
{
    public function run()
    {
        print_r("Webwork from Worker {$this->worker->getThreadId()}\n");

        if (rand(0, 10) > 5) {
            print_r("Webwork {$this->worker->getThreadId()} add new Webwork\n");

            $this->worker->getPool()->submit(new WebWork());
        }
    }
}

class WebPool extends Pool
{
    public function __construct($size, $class)
    {
        parent::__construct($size, $class, [$this]);
    }
}


$pool = new WebPool(2, 'WebWorker');
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());

while ($pool->collect(function ($work) {
    return $work->isGarbage();
})) continue;

$pool->shutdown();

一个示例结果:

Create a new worker
Worker 139878744053504: WebWorker::run
Webwork from Worker 139878744053504
Create a new worker
Worker 139878731872000: WebWorker::run
Webwork from Worker 139878731872000
Webwork from Worker 139878731872000
Webwork 139878731872000 add new Webwork
Webwork from Worker 139878744053504
Create a new worker
Worker 139878719289088: WebWorker::run
Webwork from Worker 139878719289088
Webwork 139878719289088 add new Webwork

谁能告诉我解决这个问题的最佳实践?

【问题讨论】:

    标签: php multithreading pthreads php-7.1


    【解决方案1】:

    问题是您依赖垃圾收集器来阻塞主线程,而实际上您应该使用自己的条件来阻塞。重写 pthreads v3 中的默认垃圾收集器并不是真正必要的,除非您不希望任务在完成执行后立即被收集。

    您的问题的一个可能解决方案是设置一个链接计数器变量,该变量会随着找到的每个新链接(需要抓取)而增加,并为每个已抓取的链接减少。当这个变量达到 0 时,您可以假设网站已被完全爬取,因此您可以安全地关闭您的线程池。

    这是代码中的解决方案:

    <?php
    
    class WebsiteCrawler extends Worker
    {
        public $pool; // expose the pool to our LinkCrawler tasks
    
        public function __construct(Pool $pool)
        {
            $this->pool = $pool;
        }
    }
    
    class LinkCrawler extends Threaded
    {
        private $link;
        public static $domain;
    
        public function __construct(string $link)
        {
            $this->link = $link;
            WebCrawlerPool::$links[] = $link;
            ++WebCrawlerPool::$linksRemaining->count;
            var_dump($link); // for debugging, just to show that it is collecting links
        }
    
        public function run()
        {
            $content = file_get_contents($this->link);
            $domain = preg_quote(self::$domain);
    
            preg_match_all("~href=\"(.+?{$domain}.+?)\"~", $content, $matches); // naive regex to fetch links
    
            $links = $matches[1];
    
            foreach ($links as $link) {
                if (count(WebCrawlerPool::$links) > 9) { // stop at 10 links (for example purposes...)
                    break;
                }
                if (!in_array($link, get_object_vars(WebCrawlerPool::$links), true)) {
                    $this->worker->pool->submit(new LinkCrawler($link));
                }
            }
    
            --WebCrawlerPool::$linksRemaining->count;
        }
    }
    
    class WebCrawlerPool extends Pool
    {
        public static $linksRemaining;
        public static $links;
    
        public function __construct(int $size, string $class, array $ctor = [])
        {
            parent::__construct($size, $class, [$this]);
            self::$links = new Threaded();
            self::$linksRemaining = new Threaded();
            self::$linksRemaining->count = 0;
        }
    }
    
    LinkCrawler::$domain = 'php.net';
    
    $pool = new WebCrawlerPool(8, 'WebsiteCrawler');
    $pool->submit(new LinkCrawler('http://php.net/', $pool)); // kick things off
    
    while (WebCrawlerPool::$linksRemaining->count !== 0);
    
    $pool->shutdown();
    
    print_r(WebCrawlerPool::$links);
    

    当然,以上只是示例代码 - 您可能想要以不同的方式做事。但这里有几点值得注意:

    • $linksRemaining 是一个Threaded 对象,而不是一个简单的整数。这是因为Threaded 对象具有同步内置。这意味着许多上下文可以安全地操作同一个Threaded 对象。通过利用 Threaded 对象的这一属性,您可以简化在 pthread 中编写的代码。
    • 链接 (WebCrawlerPool::$links) 保存在主上下文中创建的 Threaded 对象中。这样做有两个原因。首先,PHP 数组不能在多个上下文中安全地传递和操作(与Threaded obejcts 不同)。其次,Threaded 对象与创建它们的上下文相关联。这意味着我们应该在希望使用它们的最外层上下文中创建它们 - 否则,当创建它们的线程时它们将变得不可访问已加入。

    【讨论】:

      猜你喜欢
      • 2012-11-16
      • 2014-04-12
      • 2012-06-16
      • 2011-02-06
      • 2012-07-05
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-09-26
      相关资源
      最近更新 更多