【发布时间】:2019-09-23 00:08:17
【问题描述】:
我正在处理我有一个队列 的流程,并且我从一个已知的工作单元开始。当我处理工作单元时,它会导致 零个或多个(未知)工作单元被添加到 Queue。我继续处理队列,直到没有更多工作要执行。
我正在使用 Guzzle 进行概念验证,我接受第一个 URL 来为队列播种,然后处理可能的响应正文导致需要处理的 URL 更多。我的目标是将它们添加到队列中,并让 Guzzle 继续处理它们,直到队列中没有任何东西为止。
在其他情况下,我可以将一个变量定义为队列,并将其通过引用传递给一个函数,以便使用新工作对其进行更新。但是在 Guzzle 异步池的情况下(我认为是处理这个问题的最有效方法),似乎没有一个明确的方法来更新进程中的队列并拥有池执行请求。
Guzzle 是否提供了一种内置方法来从已实现的 Promise 回调中更新池请求列表?
use ArrayIterator;
use GuzzleHttp\Promise\EachPromise;
use GuzzleHttp\TransferStats;
use Psr\Http\Message\ResponseInterface;
// Re-usable callback which prints the URL being requested
function onStats(TransferStats $stats) {
echo sprintf(
'%s (%s)' . PHP_EOL,
$stats->getEffectiveUri(),
$stats->getTransferTime()
);
}
// The queue of work to be performed
$requests = new ArrayIterator([
$client->get('http://httpbin.org/anything', [
'on_stats' => 'onStats',
])
]);
// Process the queue, which results in more work to be performed
$p = (new EachPromise($requests, [
'concurrency' => 50,
'fulfilled' => function(ResponseInterface $response) use ($client, &$requests) {
$hash = bin2hex(random_bytes(10));
$requests[] = $client->get(sprintf('http://httpbin.org/anything/%s', $hash), [
'on_stats' => 'onStats',
]);
},
'rejected' => function($reason) {
echo $reason . PHP_EOL;
},
]))->promise();
// Wait for everything to finish
$p->wait(true);
我的问题似乎与Incrementally add requests to a Guzzle 5.0 Pool (Rolling Requests) 相似,但不同之处在于它们指的是 Guzzle 的不同主要版本。
【问题讨论】:
标签: php asynchronous guzzle