Skip to content

Commit c81d7c1

Browse files
[HttpClient] Add portable implementation based on Amp's HTTP client
1 parent b138fbc commit c81d7c1

File tree

5 files changed

+445
-0
lines changed

5 files changed

+445
-0
lines changed

composer.json

+2
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@
9999
"symfony/yaml": "self.version"
100100
},
101101
"require-dev": {
102+
"amphp/http-client": "^4.0",
103+
"amphp/sync": "1.3.0",
102104
"cache/integration-tests": "dev-master",
103105
"doctrine/annotations": "~1.0",
104106
"doctrine/cache": "~1.6",
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\HttpClient;
13+
14+
use Amp\Http\Client\Connection\LimitedConnectionPool;
15+
use Amp\Http\Client\Connection\UnlimitedConnectionPool;
16+
use Amp\Http\Client\HttpClient;
17+
use Amp\Http\Client\HttpClientBuilder;
18+
use Amp\Http\Client\Request;
19+
use Amp\Loop;
20+
use Amp\Sync\LocalKeyedSemaphore;
21+
use Psr\Log\LoggerAwareInterface;
22+
use Psr\Log\LoggerAwareTrait;
23+
use Symfony\Component\HttpClient\Exception\InvalidArgumentException;
24+
use Symfony\Component\HttpClient\Exception\TransportException;
25+
use Symfony\Component\HttpClient\Internal\AmpClientState;
26+
use Symfony\Component\HttpClient\Response\AmpResponse;
27+
use Symfony\Component\HttpClient\Response\MockResponse;
28+
use Symfony\Component\HttpClient\Response\NativeResponse;
29+
use Symfony\Component\HttpClient\Response\ResponseStream;
30+
use Symfony\Contracts\HttpClient\HttpClientInterface;
31+
use Symfony\Contracts\HttpClient\ResponseInterface;
32+
use Symfony\Contracts\HttpClient\ResponseStreamInterface;
33+
34+
/**
35+
* A portable implementation of the HttpClientInterface contracts based on Amp's HTTP client.
36+
*
37+
* @author Nicolas Grekas <p@tchwork.com>
38+
*/
39+
final class AmpHttpClient implements HttpClientInterface, LoggerAwareInterface
40+
{
41+
use HttpClientTrait;
42+
use LoggerAwareTrait;
43+
44+
private $defaultOptions = self::OPTIONS_DEFAULTS;
45+
46+
/** @var AmpClientState */
47+
private $multi;
48+
49+
/**
50+
* @param array $defaultOptions Default requests' options
51+
* @param int $maxHostConnections The maximum number of connections to open
52+
*
53+
* @see HttpClientInterface::OPTIONS_DEFAULTS for available options
54+
*/
55+
public function __construct(array $defaultOptions = [], int $maxHostConnections = 6)
56+
{
57+
$this->defaultOptions['buffer'] = $this->defaultOptions['buffer'] ?? \Closure::fromCallable([__CLASS__, 'shouldBuffer']);
58+
59+
if ($defaultOptions) {
60+
[, $this->defaultOptions] = self::prepareRequest(null, null, $defaultOptions, $this->defaultOptions);
61+
}
62+
63+
$this->multi = new AmpClientState();
64+
$this->multi->maxHostConnections = $maxHostConnections;
65+
}
66+
67+
/**
68+
* @see HttpClientInterface::OPTIONS_DEFAULTS for available options
69+
*
70+
* {@inheritdoc}
71+
*/
72+
public function request(string $method, string $url, array $options = []): ResponseInterface
73+
{
74+
[$url, $options] = self::prepareRequest($method, $url, $options, $this->defaultOptions);
75+
76+
// TODO: handle all options
77+
78+
// TODO: stream the body upload when possible
79+
$options['body'] = self::getBodyAsString($options['body']);
80+
81+
if ('' !== $options['body'] && 'POST' === $method && !isset($options['normalized_headers']['content-type'])) {
82+
$options['headers'][] = 'Content-Type: application/x-www-form-urlencoded';
83+
}
84+
85+
$this->logger && $this->logger->info(sprintf('Request: %s %s', $method, implode('', $url)));
86+
87+
if (!isset($options['normalized_headers']['user-agent'])) {
88+
$options['headers'][] = 'User-Agent: Symfony HttpClient/Amp';
89+
}
90+
91+
if (0 < $options['max_duration']) {
92+
$options['timeout'] = min($options['max_duration'], $options['timeout']);
93+
}
94+
95+
$request = new Request(implode('', $url), $method);
96+
97+
if ($options['http_version']) {
98+
switch ((float) $options['http_version']) {
99+
case 1.0: $request->setProtocolVersions(['1.0']); break;
100+
case 1.1: $request->setProtocolVersions(['1.1', '1.0']); break;
101+
case 2.0: $request->setProtocolVersions(['2', '1.1']); break;
102+
default: $request->setProtocolVersions('https:' === $url['scheme'] ? ['2', '1.1', '1.0'] : ['1.1', '1.0']); break;
103+
}
104+
}
105+
106+
foreach ($options['headers'] as $v) {
107+
$h = explode(': ', $v, 2);
108+
$request->addHeader($h[0], $h[1]);
109+
}
110+
111+
$request->setTcpConnectTimeout(1000 * $options['timeout']);
112+
$request->setTlsHandshakeTimeout(1000 * $options['timeout']);
113+
$request->setTransferTimeout(1000 * $options['max_duration']);
114+
$request->setBody($options['body'] ?? '');
115+
116+
return new AmpResponse($this->multi, $request, $options, $this->logger);
117+
}
118+
119+
/**
120+
* {@inheritdoc}
121+
*/
122+
public function stream($responses, float $timeout = null): ResponseStreamInterface
123+
{
124+
if ($responses instanceof AmpResponse) {
125+
$responses = [$responses];
126+
} elseif (!is_iterable($responses)) {
127+
throw new \TypeError(sprintf('%s() expects parameter 1 to be an iterable of AmpResponse objects, %s given.', __METHOD__, \is_object($responses) ? \get_class($responses) : \gettype($responses)));
128+
}
129+
130+
return new ResponseStream(AmpResponse::stream($responses, $timeout));
131+
}
132+
133+
private static function getBodyAsString($body): string
134+
{
135+
if (\is_resource($body)) {
136+
return stream_get_contents($body);
137+
}
138+
139+
if (!$body instanceof \Closure) {
140+
return $body;
141+
}
142+
143+
$result = '';
144+
145+
while ('' !== $data = $body(self::$CHUNK_SIZE)) {
146+
if (!\is_string($data)) {
147+
throw new TransportException(sprintf('Return value of the "body" option callback must be string, %s returned.', \gettype($data)));
148+
}
149+
150+
$result .= $data;
151+
}
152+
153+
return $result;
154+
}
155+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\HttpClient\Internal;
13+
14+
use Amp\Http\Client\Connection\DefaultConnectionFactory;
15+
use Amp\Http\Client\Connection\LimitedConnectionPool;
16+
use Amp\Http\Client\Connection\UnlimitedConnectionPool;
17+
use Amp\Http\Client\HttpClientBuilder;
18+
use Amp\Socket\DnsConnector;
19+
use Amp\Socket\StaticConnector;
20+
use Amp\Sync\LocalKeyedSemaphore;
21+
22+
/**
23+
* Internal representation of the Amp client's state.
24+
*
25+
* @author Nicolas Grekas <p@tchwork.com>
26+
*
27+
* @internal
28+
*/
29+
final class AmpClientState extends ClientState
30+
{
31+
/** @var HttpClientBuilder[] */
32+
public $clients = [];
33+
34+
/** @var int */
35+
public $maxHostConnections = 0;
36+
37+
public function getClient(string $bindTo)
38+
{
39+
if (isset($this->clients[$bindTo])) {
40+
return $this->clients[$bindTo];
41+
}
42+
43+
if ($bindTo) {
44+
$connector = (file_exists($bindTo) ? 'unix://' : 'tcp://') . $bindTo;
45+
$connector = new StaticConnector($connector, new DnsConnector());
46+
$connector = new DefaultConnectionFactory($connector);
47+
} else {
48+
$connector = null;
49+
}
50+
51+
$pool = new UnlimitedConnectionPool($connector);
52+
53+
if (0 < $this->maxHostConnections) {
54+
$pool = LimitedConnectionPool::byHost($pool, new LocalKeyedSemaphore($this->maxHostConnections));
55+
}
56+
57+
return $this->clients[$bindTo] = (new HttpClientBuilder())
58+
->allowDeprecatedUriUserInfo()
59+
->retry(0)
60+
->usingPool($pool);
61+
}
62+
}

0 commit comments

Comments
 (0)