-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
[HttpClient] Add portable HTTP/2 implementation based on Amp's HTTP client #35115
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,163 @@ | ||
<?php | ||
|
||
/* | ||
* This file is part of the Symfony package. | ||
* | ||
* (c) Fabien Potencier <fabien@symfony.com> | ||
* | ||
* For the full copyright and license information, please view the LICENSE | ||
* file that was distributed with this source code. | ||
*/ | ||
|
||
namespace Symfony\Component\HttpClient; | ||
|
||
use Amp\CancelledException; | ||
use Amp\Http\Client\DelegateHttpClient; | ||
use Amp\Http\Client\InterceptedHttpClient; | ||
use Amp\Http\Client\PooledHttpClient; | ||
use Amp\Http\Client\Request; | ||
use Amp\Http\Tunnel\Http1TunnelConnector; | ||
use Psr\Log\LoggerAwareInterface; | ||
use Psr\Log\LoggerAwareTrait; | ||
use Symfony\Component\HttpClient\Exception\TransportException; | ||
use Symfony\Component\HttpClient\Internal\AmpClientState; | ||
use Symfony\Component\HttpClient\Response\AmpResponse; | ||
use Symfony\Component\HttpClient\Response\ResponseStream; | ||
use Symfony\Contracts\HttpClient\HttpClientInterface; | ||
use Symfony\Contracts\HttpClient\ResponseInterface; | ||
use Symfony\Contracts\HttpClient\ResponseStreamInterface; | ||
use Symfony\Contracts\Service\ResetInterface; | ||
|
||
if (!interface_exists(DelegateHttpClient::class)) { | ||
throw new \LogicException('You cannot use "Symfony\Component\HttpClient\AmpHttpClient" as the "amphp/http-client" package is not installed. Try running "composer require amphp/http-client".'); | ||
} | ||
|
||
/** | ||
* A portable implementation of the HttpClientInterface contracts based on Amp's HTTP client. | ||
* | ||
* @author Nicolas Grekas <p@tchwork.com> | ||
*/ | ||
final class AmpHttpClient implements HttpClientInterface, LoggerAwareInterface, ResetInterface | ||
{ | ||
use HttpClientTrait; | ||
use LoggerAwareTrait; | ||
|
||
private $defaultOptions = self::OPTIONS_DEFAULTS; | ||
|
||
/** @var AmpClientState */ | ||
private $multi; | ||
|
||
/** | ||
* @param array $defaultOptions Default requests' options | ||
* @param callable $clientConfigurator A callable that builds a {@see DelegateHttpClient} from a {@see PooledHttpClient}; | ||
* passing null builds an {@see InterceptedHttpClient} with 2 retries on failures | ||
* @param int $maxHostConnections The maximum number of connections to a single host | ||
* @param int $maxPendingPushes The maximum number of pushed responses to accept in the queue | ||
* | ||
* @see HttpClientInterface::OPTIONS_DEFAULTS for available options | ||
*/ | ||
public function __construct(array $defaultOptions = [], callable $clientConfigurator = null, int $maxHostConnections = 6, int $maxPendingPushes = 50) | ||
{ | ||
$this->defaultOptions['buffer'] = $this->defaultOptions['buffer'] ?? \Closure::fromCallable([__CLASS__, 'shouldBuffer']); | ||
|
||
if ($defaultOptions) { | ||
[, $this->defaultOptions] = self::prepareRequest(null, null, $defaultOptions, $this->defaultOptions); | ||
} | ||
|
||
$this->multi = new AmpClientState($clientConfigurator, $maxHostConnections, $maxPendingPushes, $this->logger); | ||
} | ||
|
||
/** | ||
* @see HttpClientInterface::OPTIONS_DEFAULTS for available options | ||
* | ||
* {@inheritdoc} | ||
*/ | ||
public function request(string $method, string $url, array $options = []): ResponseInterface | ||
{ | ||
[$url, $options] = self::prepareRequest($method, $url, $options, $this->defaultOptions); | ||
|
||
$options['proxy'] = self::getProxy($options['proxy'], $url, $options['no_proxy']); | ||
|
||
if (null !== $options['proxy'] && !class_exists(Http1TunnelConnector::class)) { | ||
throw new \LogicException('You cannot use the "proxy" option as the "amphp/http-tunnel" package is not installed. Try running "composer require amphp/http-tunnel".'); | ||
} | ||
|
||
if ('' !== $options['body'] && 'POST' === $method && !isset($options['normalized_headers']['content-type'])) { | ||
$options['headers'][] = 'Content-Type: application/x-www-form-urlencoded'; | ||
} | ||
|
||
if (!isset($options['normalized_headers']['user-agent'])) { | ||
$options['headers'][] = 'User-Agent: Symfony HttpClient/Amp'; | ||
} | ||
|
||
if (0 < $options['max_duration']) { | ||
$options['timeout'] = min($options['max_duration'], $options['timeout']); | ||
} | ||
|
||
if ($options['resolve']) { | ||
$this->multi->dnsCache = $options['resolve'] + $this->multi->dnsCache; | ||
} | ||
|
||
if ($options['peer_fingerprint'] && !isset($options['peer_fingerprint']['pin-sha256'])) { | ||
throw new TransportException(__CLASS__.' supports only "pin-sha256" fingerprints.'); | ||
} | ||
|
||
$request = new Request(implode('', $url), $method); | ||
|
||
if ($options['http_version']) { | ||
switch ((float) $options['http_version']) { | ||
case 1.0: $request->setProtocolVersions(['1.0']); break; | ||
case 1.1: $request->setProtocolVersions(['1.1', '1.0']); break; | ||
default: $request->setProtocolVersions(['2', '1.1', '1.0']); break; | ||
} | ||
} | ||
|
||
foreach ($options['headers'] as $v) { | ||
$h = explode(': ', $v, 2); | ||
$request->addHeader($h[0], $h[1]); | ||
} | ||
|
||
$request->setTcpConnectTimeout(1000 * $options['timeout']); | ||
$request->setTlsHandshakeTimeout(1000 * $options['timeout']); | ||
$request->setTransferTimeout(1000 * $options['max_duration']); | ||
|
||
if ('' !== $request->getUri()->getUserInfo() && !$request->hasHeader('authorization')) { | ||
$auth = explode(':', $request->getUri()->getUserInfo(), 2); | ||
$auth = array_map('rawurldecode', $auth) + [1 => '']; | ||
$request->setHeader('Authorization', 'Basic '.base64_encode(implode(':', $auth))); | ||
} | ||
|
||
return new AmpResponse($this->multi, $request, $options, $this->logger); | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function stream($responses, float $timeout = null): ResponseStreamInterface | ||
{ | ||
if ($responses instanceof AmpResponse) { | ||
$responses = [$responses]; | ||
} elseif (!is_iterable($responses)) { | ||
throw new \TypeError(sprintf('%s() expects parameter 1 to be an iterable of AmpResponse objects, %s given.', __METHOD__, \is_object($responses) ? \get_class($responses) : \gettype($responses))); | ||
} | ||
|
||
return new ResponseStream(AmpResponse::stream($responses, $timeout)); | ||
} | ||
|
||
public function reset() | ||
{ | ||
$this->multi->dnsCache = []; | ||
|
||
foreach ($this->multi->pushedResponses as $authority => $pushedResponses) { | ||
foreach ($pushedResponses as [$pushedUrl, $pushDeferred]) { | ||
$pushDeferred->fail(new CancelledException()); | ||
|
||
if ($this->logger) { | ||
$this->logger->debug(sprintf('Unused pushed response: "%s"', $pushedUrl)); | ||
} | ||
} | ||
} | ||
|
||
$this->multi->pushedResponses = []; | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
<?php | ||
|
||
/* | ||
* This file is part of the Symfony package. | ||
* | ||
* (c) Fabien Potencier <fabien@symfony.com> | ||
* | ||
* For the full copyright and license information, please view the LICENSE | ||
* file that was distributed with this source code. | ||
*/ | ||
|
||
namespace Symfony\Component\HttpClient\Internal; | ||
|
||
use Amp\ByteStream\InputStream; | ||
use Amp\ByteStream\ResourceInputStream; | ||
use Amp\Http\Client\RequestBody; | ||
use Amp\Promise; | ||
use Amp\Success; | ||
use Symfony\Component\HttpClient\Exception\TransportException; | ||
|
||
/** | ||
* @author Nicolas Grekas <p@tchwork.com> | ||
* | ||
* @internal | ||
*/ | ||
class AmpBody implements RequestBody, InputStream | ||
{ | ||
private $body; | ||
private $onProgress; | ||
private $offset = 0; | ||
private $length = -1; | ||
private $uploaded; | ||
|
||
public function __construct($body, &$info, \Closure $onProgress) | ||
{ | ||
$this->body = $body; | ||
$this->info = &$info; | ||
$this->onProgress = $onProgress; | ||
|
||
if (\is_resource($body)) { | ||
$this->offset = ftell($body); | ||
$this->length = fstat($body)['size']; | ||
$this->body = new ResourceInputStream($body); | ||
} elseif (\is_string($body)) { | ||
$this->length = \strlen($body); | ||
} | ||
} | ||
|
||
public function createBodyStream(): InputStream | ||
{ | ||
if (null !== $this->uploaded) { | ||
$this->uploaded = null; | ||
|
||
if (\is_string($this->body)) { | ||
$this->offset = 0; | ||
} elseif ($this->body instanceof ResourceInputStream) { | ||
fseek($this->body->getResource(), $this->offset); | ||
} | ||
} | ||
|
||
return $this; | ||
} | ||
nicolas-grekas marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
public function getHeaders(): Promise | ||
{ | ||
return new Success([]); | ||
} | ||
|
||
public function getBodyLength(): Promise | ||
{ | ||
return new Success($this->length - $this->offset); | ||
} | ||
|
||
public function read(): Promise | ||
{ | ||
$this->info['size_upload'] += $this->uploaded; | ||
$this->uploaded = 0; | ||
($this->onProgress)(); | ||
|
||
$chunk = $this->doRead(); | ||
$chunk->onResolve(function ($e, $data) { | ||
if (null !== $data) { | ||
$this->uploaded = \strlen($data); | ||
nicolas-grekas marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} else { | ||
$this->info['upload_content_length'] = $this->info['size_upload']; | ||
} | ||
}); | ||
|
||
return $chunk; | ||
} | ||
|
||
public static function rewind(RequestBody $body): RequestBody | ||
{ | ||
if (!$body instanceof self) { | ||
return $body; | ||
} | ||
|
||
$body->uploaded = null; | ||
|
||
if ($body->body instanceof ResourceInputStream) { | ||
fseek($body->body->getResource(), $body->offset); | ||
|
||
return new $body($body->body, $body->info, $body->onProgress); | ||
} | ||
|
||
if (\is_string($body->body)) { | ||
$body->offset = 0; | ||
} | ||
|
||
return $body; | ||
} | ||
|
||
private function doRead(): Promise | ||
{ | ||
if ($this->body instanceof ResourceInputStream) { | ||
return $this->body->read(); | ||
} | ||
|
||
if (null === $this->offset || !$this->length) { | ||
return new Success(); | ||
} | ||
|
||
if (\is_string($this->body)) { | ||
$this->offset = null; | ||
|
||
return new Success($this->body); | ||
} | ||
|
||
if ('' === $data = ($this->body)(16372)) { | ||
$this->offset = null; | ||
|
||
return new Success(); | ||
} | ||
|
||
if (!\is_string($data)) { | ||
throw new TransportException(sprintf('Return value of the "body" option callback must be string, %s returned.', \gettype($data))); | ||
} | ||
|
||
return new Success($data); | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To run the tests for the component separately: they block when run concurrently with other components, and we don't care as this doesn't happen in practice.