Skip to content

[HttpClient] add AsyncDecoratorTrait to ease processing responses without breaking async #36779

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions src/Symfony/Component/HttpClient/AsyncDecoratorTrait.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\HttpClient;

use Symfony\Component\HttpClient\Response\AsyncResponse;
use Symfony\Component\HttpClient\Response\ResponseStream;
use Symfony\Contracts\HttpClient\HttpClientInterface;
use Symfony\Contracts\HttpClient\ResponseInterface;
use Symfony\Contracts\HttpClient\ResponseStreamInterface;

/**
* Eases with processing responses while streaming them.
*
* @author Nicolas Grekas <p@tchwork.com>
*/
trait AsyncDecoratorTrait
{
private $client;

public function __construct(HttpClientInterface $client = null)
{
$this->client = $client ?? HttpClient::create();
}

/**
* {@inheritdoc}
*
* @return AsyncResponse
*/
abstract public function request(string $method, string $url, array $options = []): ResponseInterface;

/**
* {@inheritdoc}
*/
public function stream($responses, float $timeout = null): ResponseStreamInterface
{
if ($responses instanceof AsyncResponse) {
$responses = [$responses];
} elseif (!is_iterable($responses)) {
throw new \TypeError(sprintf('"%s()" expects parameter 1 to be an iterable of AsyncResponse objects, "%s" given.', __METHOD__, get_debug_type($responses)));
}

return new ResponseStream(AsyncResponse::stream($responses, $timeout, static::class));
}
}
5 changes: 5 additions & 0 deletions src/Symfony/Component/HttpClient/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
CHANGELOG
=========

5.2.0
-----

* added `AsyncDecoratorTrait` to ease processing responses without breaking async

5.1.0
-----

Expand Down
6 changes: 5 additions & 1 deletion src/Symfony/Component/HttpClient/Chunk/ErrorChunk.php
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,12 @@ public function getError(): ?string
/**
* @return bool Whether the wrapped error has been thrown or not
*/
public function didThrow(): bool
public function didThrow(bool $didThrow = null): bool
{
if (null !== $didThrow && $this->didThrow !== $didThrow) {
return !$this->didThrow = $didThrow;
}

return $this->didThrow;
}

Expand Down
4 changes: 2 additions & 2 deletions src/Symfony/Component/HttpClient/Internal/HttplugWaitLoop.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use Psr\Http\Message\ResponseFactoryInterface;
use Psr\Http\Message\ResponseInterface as Psr7ResponseInterface;
use Psr\Http\Message\StreamFactoryInterface;
use Symfony\Component\HttpClient\Response\ResponseTrait;
use Symfony\Component\HttpClient\Response\CommonResponseTrait;
use Symfony\Component\HttpClient\Response\StreamWrapper;
use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface;
use Symfony\Contracts\HttpClient\HttpClientInterface;
Expand Down Expand Up @@ -119,7 +119,7 @@ public function createPsr7Response(ResponseInterface $response, bool $buffer = f
}
}

if (isset(class_uses($response)[ResponseTrait::class])) {
if (isset(class_uses($response)[CommonResponseTrait::class])) {
$body = $this->streamFactory->createStreamFromResource($response->toStream(false));
} elseif (!$buffer) {
$body = $this->streamFactory->createStreamFromResource(StreamWrapper::createResource($response, $this->client));
Expand Down
4 changes: 2 additions & 2 deletions src/Symfony/Component/HttpClient/Psr18Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
use Psr\Http\Message\StreamInterface;
use Psr\Http\Message\UriFactoryInterface;
use Psr\Http\Message\UriInterface;
use Symfony\Component\HttpClient\Response\ResponseTrait;
use Symfony\Component\HttpClient\Response\CommonResponseTrait;
use Symfony\Component\HttpClient\Response\StreamWrapper;
use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface;
use Symfony\Contracts\HttpClient\HttpClientInterface;
Expand Down Expand Up @@ -104,7 +104,7 @@ public function sendRequest(RequestInterface $request): ResponseInterface
}
}

$body = isset(class_uses($response)[ResponseTrait::class]) ? $response->toStream(false) : StreamWrapper::createResource($response, $this->client);
$body = isset(class_uses($response)[CommonResponseTrait::class]) ? $response->toStream(false) : StreamWrapper::createResource($response, $this->client);
$body = $this->streamFactory->createStreamFromResource($body);

if ($body->isSeekable()) {
Expand Down
3 changes: 2 additions & 1 deletion src/Symfony/Component/HttpClient/Response/AmpResponse.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
*/
final class AmpResponse implements ResponseInterface
{
use ResponseTrait;
use CommonResponseTrait;
use TransportResponseTrait;

private $multi;
private $options;
Expand Down
175 changes: 175 additions & 0 deletions src/Symfony/Component/HttpClient/Response/AsyncContext.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\HttpClient\Response;

use Symfony\Component\HttpClient\Chunk\DataChunk;
use Symfony\Component\HttpClient\Chunk\LastChunk;
use Symfony\Contracts\HttpClient\ChunkInterface;
use Symfony\Contracts\HttpClient\HttpClientInterface;
use Symfony\Contracts\HttpClient\ResponseInterface;

/**
* A DTO to work with AsyncResponse.
*
* @author Nicolas Grekas <p@tchwork.com>
*/
final class AsyncContext
{
private $passthru;
private $client;
private $response;
private $info = [];
private $content;
private $offset;

public function __construct(&$passthru, HttpClientInterface $client, ResponseInterface &$response, array &$info, $content, int $offset)
{
$this->passthru = &$passthru;
$this->client = $client;
$this->response = &$response;
$this->info = &$info;
$this->content = $content;
$this->offset = $offset;
}

/**
* Returns the HTTP status without consuming the response.
*/
public function getStatusCode(): int
{
return $this->response->getInfo('http_code');
}

/**
* Returns the headers without consuming the response.
*/
public function getHeaders(): array
{
$headers = [];

foreach ($this->response->getInfo('response_headers') as $h) {
if (11 <= \strlen($h) && '/' === $h[4] && preg_match('#^HTTP/\d+(?:\.\d+)? ([123456789]\d\d)(?: |$)#', $h, $m)) {
$headers = [];
} elseif (2 === \count($m = explode(':', $h, 2))) {
$headers[strtolower($m[0])][] = ltrim($m[1]);
}
}

return $headers;
}

/**
* @return resource|null The PHP stream resource where the content is buffered, if it is
*/
public function getContent()
{
return $this->content;
}

/**
* Creates a new chunk of content.
*/
public function createChunk(string $data): ChunkInterface
{
return new DataChunk($this->offset, $data);
}

/**
* Pauses the request for the given number of seconds.
*/
public function pause(float $duration): void
{
if (\is_callable($pause = $this->response->getInfo('pause_handler'))) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this anticipates #37136 to support pausing requests/responses

$pause($duration);
} elseif (0 < $duration) {
usleep(1E6 * $duration);
}
}

/**
* Cancels the request and returns the last chunk to yield.
*/
public function cancel(): ChunkInterface
{
$this->info['canceled'] = true;
$this->info['error'] = 'Response has been canceled.';
$this->response->cancel();

return new LastChunk();
}

/**
* Returns the current info of the response.
*/
public function getInfo(string $type = null)
{
if (null !== $type) {
return $this->info[$type] ?? $this->response->getInfo($type);
}

return $this->info + $this->response->getInfo();
}

/**
* Attaches an info to the response.
*/
public function setInfo(string $type, $value): self
{
if ('canceled' === $type && $value !== $this->info['canceled']) {
throw new \LogicException('You cannot set the "canceled" info directly.');
}

if (null === $value) {
unset($this->info[$type]);
} else {
$this->info[$type] = $value;
}

return $this;
}

/**
* Returns the currently processed response.
*/
public function getResponse(): ResponseInterface
{
return $this->response;
}

/**
* Replaces the currently processed response by doing a new request.
*/
public function replaceRequest(string $method, string $url, array $options = []): ResponseInterface
{
$this->info['previous_info'][] = $this->response->getInfo();

return $this->response = $this->client->request($method, $url, ['buffer' => false] + $options);
}

/**
* Replaces the currently processed response by another one.
*/
public function replaceResponse(ResponseInterface $response): ResponseInterface
{
$this->info['previous_info'][] = $this->response->getInfo();

return $this->response = $response;
}

/**
* Replaces or removes the chunk filter iterator.
*/
public function passthru(callable $passthru = null): void
{
$this->passthru = $passthru;
}
}
Loading