From 74c2003f1e163d96ffcaa4ce62d2b313d2223979 Mon Sep 17 00:00:00 2001 From: Nicolas Grekas Date: Tue, 25 Jun 2019 18:14:41 +0200 Subject: [PATCH 1/2] [HttpClient] add ResponseSetMonitor to trigger callbacks on response completion --- .../HttpClient/ResponseSetMonitor.php | 155 ++++++++++++++++++ 1 file changed, 155 insertions(+) create mode 100644 src/Symfony/Component/HttpClient/ResponseSetMonitor.php diff --git a/src/Symfony/Component/HttpClient/ResponseSetMonitor.php b/src/Symfony/Component/HttpClient/ResponseSetMonitor.php new file mode 100644 index 0000000000000..cb14df4c0be77 --- /dev/null +++ b/src/Symfony/Component/HttpClient/ResponseSetMonitor.php @@ -0,0 +1,155 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\HttpClient; + +use Symfony\Contracts\HttpClient\Exception\ExceptionInterface; +use Symfony\Contracts\HttpClient\HttpClientInterface; +use Symfony\Contracts\HttpClient\ResponseInterface; + +/** + * Monitors a set of responses and triggers callbacks as they complete. + * + * @author Nicolas Grekas + */ +final class ResponseSetMonitor implements \Countable +{ + private $client; + private $onHeaders; + private $onBody; + private $onError; + private $responses; + + public function __construct(HttpClientInterface $client, callable $onHeaders = null, callable $onBody = null, callable $onError = null) + { + $this->client = $client; + $this->onHeaders = $onHeaders; + $this->onBody = $onBody; + $this->onError = $onError; + $this->responses = new \SplObjectStorage(); + } + + /** + * Adds a response to the monitored set. + * + * The response must be created with the same client that was passed to the constructor. + */ + public function add(ResponseInterface $response, callable $onHeaders = null, callable $onBody = null, callable $onError = null): void + { + $this->responses[$response] = [$onHeaders, $onBody, $onError]; + $this->tick(); + } + + /** + * Monitors pending responses, moving them forward as network activity happens. + * + * @param float $timeout The maximum duration of the tick + * + * @return int The number of responses remaining in the set after the tick + */ + public function tick(float $timeout = 0.0): int + { + return $this->wait($timeout, false); + } + + /** + * Completes all pending responses. + * + * @param float|null $idleTimeout The maximum inactivy timeout before erroring idle responses + */ + public function complete(float $idleTimeout = null): void + { + $this->wait($idleTimeout, true); + } + + /** + * Cancels all pending responses. + */ + public function cancel(): void + { + foreach ($this->responses as $response) { + $response->cancel(); + } + + $this->responses = new \SplObjectStorage(); + } + + /** + * Returns the number of pending responses. + */ + public function count(): int + { + return \count($this->responses); + } + + public function __destruct() + { + $this->wait(null, true); + } + + private function wait(?float $timeout, bool $errorOnTimeout): int + { + $error = null; + $remainingTimeout = $timeout; + + if (!$errorOnTimeout && $remainingTimeout) { + $startTime = microtime(true); + } + + foreach ($this->client->stream($this->responses, $remainingTimeout) as $response => $chunk) { + try { + if ($chunk->isTimeout() && !$errorOnTimeout) { + continue; + } + + if (!$chunk->isFirst() && !$chunk->isLast()) { + continue; + } + + [$onHeaders, $onBody] = $this->responses[$response]; + $onHeaders = $onHeaders ?? $this->onHeaders; + $onBody = $onBody ?? $this->onBody; + + if (null !== $onHeaders && $chunk->isFirst()) { + $onHeaders($response); + } + + if (null !== $onBody && $chunk->isLast()) { + $onBody($response); + } + + if (null === $onBody || $chunk->isLast()) { + unset($this->responses[$response]); + } + } catch (ExceptionInterface $e) { + [, , $onError] = $this->responses[$response]; + $onError = $onError ?? $this->onError; + unset($this->responses[$response]); + + if (null !== $onError) { + $onError($e, $response); + } else { + $error = $error ?? $e; + } + } finally { + if (!$errorOnTimeout && $remainingTimeout) { + $remainingTimeout = max(0.0, $timeout - microtime(true) + $startTime); + } + } + } + + if (null !== $error) { + throw $error; + } + + return \count($this->responses); + } +} From df6f5bd136105ec7df3e883fbf375fc5edc0b442 Mon Sep 17 00:00:00 2001 From: Nicolas Grekas Date: Wed, 26 Jun 2019 19:57:25 +0200 Subject: [PATCH 2/2] ResponseSetMonitor -> CallbackHttpClient --- ...eSetMonitor.php => CallbackHttpClient.php} | 86 +++++++++---------- 1 file changed, 43 insertions(+), 43 deletions(-) rename src/Symfony/Component/HttpClient/{ResponseSetMonitor.php => CallbackHttpClient.php} (58%) diff --git a/src/Symfony/Component/HttpClient/ResponseSetMonitor.php b/src/Symfony/Component/HttpClient/CallbackHttpClient.php similarity index 58% rename from src/Symfony/Component/HttpClient/ResponseSetMonitor.php rename to src/Symfony/Component/HttpClient/CallbackHttpClient.php index cb14df4c0be77..a8c3ed3ab4460 100644 --- a/src/Symfony/Component/HttpClient/ResponseSetMonitor.php +++ b/src/Symfony/Component/HttpClient/CallbackHttpClient.php @@ -14,38 +14,62 @@ use Symfony\Contracts\HttpClient\Exception\ExceptionInterface; use Symfony\Contracts\HttpClient\HttpClientInterface; use Symfony\Contracts\HttpClient\ResponseInterface; +use Symfony\Contracts\HttpClient\ResponseStreamInterface; /** - * Monitors a set of responses and triggers callbacks as they complete. + * Calls callbacks as responses complete. * * @author Nicolas Grekas */ -final class ResponseSetMonitor implements \Countable +final class CallbackHttpClient implements HttpClientInterface, \Countable { private $client; private $onHeaders; - private $onBody; + private $onContent; private $onError; private $responses; - public function __construct(HttpClientInterface $client, callable $onHeaders = null, callable $onBody = null, callable $onError = null) + public function __construct(HttpClientInterface $client) { $this->client = $client; - $this->onHeaders = $onHeaders; - $this->onBody = $onBody; - $this->onError = $onError; $this->responses = new \SplObjectStorage(); } + public function withCallbacks(?callable $onHeaders, callable $onContent = null, callable $onError = null): self + { + $new = clone $this; + + $new->responses = $this->responses; + $new->onHeaders = $onHeaders; + $new->onContent = $onContent; + $new->onError = $onError; + + return $new; + } + /** * Adds a response to the monitored set. * * The response must be created with the same client that was passed to the constructor. */ - public function add(ResponseInterface $response, callable $onHeaders = null, callable $onBody = null, callable $onError = null): void + public function request(string $method, string $url, array $options): ResponseInterface + { + $response = $this->client->request($method, $url, $options); + + if (null !== $this->onHeaders || null !== $this->onContent || null !== $this->onError) { + $this->responses[$response] = [$onHeaders, $onContent, $onError]; + $this->tick(); + } + + return $response; + } + + /** + * {@inheritdoc} + */ + public function stream($responses, float $timeout = null): ResponseStreamInterface { - $this->responses[$response] = [$onHeaders, $onBody, $onError]; - $this->tick(); + return new CallbackResponseStream($this->client->stream($responses, $timeout)); } /** @@ -79,7 +103,7 @@ public function cancel(): void $response->cancel(); } - $this->responses = new \SplObjectStorage(); + $this->responses->removeAll($this->responses); } /** @@ -104,41 +128,17 @@ private function wait(?float $timeout, bool $errorOnTimeout): int $startTime = microtime(true); } - foreach ($this->client->stream($this->responses, $remainingTimeout) as $response => $chunk) { - try { - if ($chunk->isTimeout() && !$errorOnTimeout) { - continue; - } - - if (!$chunk->isFirst() && !$chunk->isLast()) { - continue; - } - - [$onHeaders, $onBody] = $this->responses[$response]; - $onHeaders = $onHeaders ?? $this->onHeaders; - $onBody = $onBody ?? $this->onBody; + $stream = $this->client->stream($this->responses, $remainingTimeout); + $stream = new CallbackResponseStream($stream, true); - if (null !== $onHeaders && $chunk->isFirst()) { - $onHeaders($response); - } - - if (null !== $onBody && $chunk->isLast()) { - $onBody($response); - } - - if (null === $onBody || $chunk->isLast()) { - unset($this->responses[$response]); + foreach ($stream as $chunk) { + try { + if ($chunk->isTimeout() && $errorOnTimeout) { + // throw an exception on timeout + $chunk->isFirst(); } } catch (ExceptionInterface $e) { - [, , $onError] = $this->responses[$response]; - $onError = $onError ?? $this->onError; - unset($this->responses[$response]); - - if (null !== $onError) { - $onError($e, $response); - } else { - $error = $error ?? $e; - } + $error = $error ?? $e; } finally { if (!$errorOnTimeout && $remainingTimeout) { $remainingTimeout = max(0.0, $timeout - microtime(true) + $startTime);