diff --git a/CurlHttpClient.php b/CurlHttpClient.php index 3e15bef..31eca9d 100644 --- a/CurlHttpClient.php +++ b/CurlHttpClient.php @@ -237,7 +237,7 @@ public function request(string $method, string $url, array $options = []): Respo if (!\is_string($body)) { if (\is_resource($body)) { - $curlopts[\CURLOPT_INFILE] = $body; + $curlopts[\CURLOPT_READDATA] = $body; } else { $curlopts[\CURLOPT_READFUNCTION] = static function ($ch, $fd, $length) use ($body) { static $eof = false; @@ -316,6 +316,9 @@ public function request(string $method, string $url, array $options = []): Respo } foreach ($curlopts as $opt => $value) { + if (\PHP_INT_SIZE === 8 && \defined('CURLOPT_INFILESIZE_LARGE') && \CURLOPT_INFILESIZE === $opt && $value >= 1 << 31) { + $opt = \CURLOPT_INFILESIZE_LARGE; + } if (null !== $value && !curl_setopt($ch, $opt, $value) && \CURLOPT_CERTINFO !== $opt && (!\defined('CURLOPT_HEADEROPT') || \CURLOPT_HEADEROPT !== $opt)) { $constantName = $this->findConstantName($opt); throw new TransportException(sprintf('Curl option "%s" is not supported.', $constantName ?? $opt)); @@ -472,7 +475,7 @@ private function validateExtraCurlOptions(array $options): void \CURLOPT_RESOLVE => 'resolve', \CURLOPT_NOSIGNAL => 'timeout', \CURLOPT_HTTPHEADER => 'headers', - \CURLOPT_INFILE => 'body', + \CURLOPT_READDATA => 'body', \CURLOPT_READFUNCTION => 'body', \CURLOPT_INFILESIZE => 'body', \CURLOPT_POSTFIELDS => 'body', diff --git a/HttpClientTrait.php b/HttpClientTrait.php index 89446ff..4be9afa 100644 --- a/HttpClientTrait.php +++ b/HttpClientTrait.php @@ -356,9 +356,11 @@ private static function normalizeBody($body, array &$normalizedHeaders = []) } }); - $body = http_build_query($body, '', '&'); + if ('' === $body = http_build_query($body, '', '&')) { + return ''; + } - if ('' === $body || !$streams && !str_contains($normalizedHeaders['content-type'][0] ?? '', 'multipart/form-data')) { + if (!$streams && !str_contains($normalizedHeaders['content-type'][0] ?? '', 'multipart/form-data')) { if (!str_contains($normalizedHeaders['content-type'][0] ?? '', 'application/x-www-form-urlencoded')) { $normalizedHeaders['content-type'] = ['Content-Type: application/x-www-form-urlencoded']; } diff --git a/Response/AmpResponse.php b/Response/AmpResponse.php index e01d97e..00001cc 100644 --- a/Response/AmpResponse.php +++ b/Response/AmpResponse.php @@ -179,19 +179,17 @@ private static function schedule(self $response, array &$runningResponses): void /** * @param AmpClientState $multi */ - private static function perform(ClientState $multi, ?array &$responses = null): void + private static function perform(ClientState $multi, ?array $responses = null): void { - if ($responses) { - foreach ($responses as $response) { - try { - if ($response->info['start_time']) { - $response->info['total_time'] = microtime(true) - $response->info['start_time']; - ($response->onProgress)(); - } - } catch (\Throwable $e) { - $multi->handlesActivity[$response->id][] = null; - $multi->handlesActivity[$response->id][] = $e; + foreach ($responses ?? [] as $response) { + try { + if ($response->info['start_time']) { + $response->info['total_time'] = microtime(true) - $response->info['start_time']; + ($response->onProgress)(); } + } catch (\Throwable $e) { + $multi->handlesActivity[$response->id][] = null; + $multi->handlesActivity[$response->id][] = $e; } } } diff --git a/Response/AsyncResponse.php b/Response/AsyncResponse.php index 25f6409..7aa16bc 100644 --- a/Response/AsyncResponse.php +++ b/Response/AsyncResponse.php @@ -12,7 +12,6 @@ namespace Symfony\Component\HttpClient\Response; use Symfony\Component\HttpClient\Chunk\ErrorChunk; -use Symfony\Component\HttpClient\Chunk\FirstChunk; use Symfony\Component\HttpClient\Chunk\LastChunk; use Symfony\Component\HttpClient\Exception\TransportException; use Symfony\Contracts\HttpClient\ChunkInterface; @@ -245,7 +244,7 @@ public static function stream(iterable $responses, ?float $timeout = null, ?stri $wrappedResponses[] = $r->response; if ($r->stream) { - yield from self::passthruStream($response = $r->response, $r, new FirstChunk(), $asyncMap); + yield from self::passthruStream($response = $r->response, $r, $asyncMap, new LastChunk()); if (!isset($asyncMap[$response])) { array_pop($wrappedResponses); @@ -276,15 +275,9 @@ public static function stream(iterable $responses, ?float $timeout = null, ?stri } if (!$r->passthru) { - if (null !== $chunk->getError() || $chunk->isLast()) { - unset($asyncMap[$response]); - } elseif (null !== $r->content && '' !== ($content = $chunk->getContent()) && \strlen($content) !== fwrite($r->content, $content)) { - $chunk = new ErrorChunk($r->offset, new TransportException(sprintf('Failed writing %d bytes to the response buffer.', \strlen($content)))); - $r->info['error'] = $chunk->getError(); - $r->response->cancel(); - } + $r->stream = (static fn () => yield $chunk)(); + yield from self::passthruStream($response, $r, $asyncMap); - yield $r => $chunk; continue; } @@ -347,13 +340,13 @@ private static function passthru(HttpClientInterface $client, self $r, ChunkInte } $r->stream = $stream; - yield from self::passthruStream($response, $r, null, $asyncMap); + yield from self::passthruStream($response, $r, $asyncMap); } /** * @param \SplObjectStorage|null $asyncMap */ - private static function passthruStream(ResponseInterface $response, self $r, ?ChunkInterface $chunk, ?\SplObjectStorage $asyncMap): \Generator + private static function passthruStream(ResponseInterface $response, self $r, ?\SplObjectStorage $asyncMap, ?ChunkInterface $chunk = null): \Generator { while (true) { try { diff --git a/Response/CurlResponse.php b/Response/CurlResponse.php index 88cb764..e5dfd3e 100644 --- a/Response/CurlResponse.php +++ b/Response/CurlResponse.php @@ -265,11 +265,11 @@ private static function schedule(self $response, array &$runningResponses): void /** * @param CurlClientState $multi */ - private static function perform(ClientState $multi, ?array &$responses = null): void + private static function perform(ClientState $multi, ?array $responses = null): void { if ($multi->performing) { if ($responses) { - $response = current($responses); + $response = $responses[array_key_first($responses)]; $multi->handlesActivity[(int) $response->handle][] = null; $multi->handlesActivity[(int) $response->handle][] = new TransportException(sprintf('Userland callback cannot use the client nor the response while processing "%s".', curl_getinfo($response->handle, \CURLINFO_EFFECTIVE_URL))); } diff --git a/Response/MockResponse.php b/Response/MockResponse.php index 0493bcb..c6b96c0 100644 --- a/Response/MockResponse.php +++ b/Response/MockResponse.php @@ -167,7 +167,7 @@ protected static function schedule(self $response, array &$runningResponses): vo $runningResponses[0][1][$response->id] = $response; } - protected static function perform(ClientState $multi, array &$responses): void + protected static function perform(ClientState $multi, array $responses): void { foreach ($responses as $response) { $id = $response->id; diff --git a/Response/NativeResponse.php b/Response/NativeResponse.php index 9a5184e..5431288 100644 --- a/Response/NativeResponse.php +++ b/Response/NativeResponse.php @@ -228,7 +228,7 @@ private static function schedule(self $response, array &$runningResponses): void /** * @param NativeClientState $multi */ - private static function perform(ClientState $multi, ?array &$responses = null): void + private static function perform(ClientState $multi, ?array $responses = null): void { foreach ($multi->openHandles as $i => [$pauseExpiry, $h, $buffer, $onProgress]) { if ($pauseExpiry) { diff --git a/Response/TransportResponseTrait.php b/Response/TransportResponseTrait.php index 7b65fd7..95f7e96 100644 --- a/Response/TransportResponseTrait.php +++ b/Response/TransportResponseTrait.php @@ -92,7 +92,7 @@ abstract protected static function schedule(self $response, array &$runningRespo /** * Performs all pending non-blocking operations. */ - abstract protected static function perform(ClientState $multi, array &$responses): void; + abstract protected static function perform(ClientState $multi, array $responses): void; /** * Waits for network activity. @@ -150,10 +150,15 @@ public static function stream(iterable $responses, ?float $timeout = null): \Gen $lastActivity = hrtime(true) / 1E9; $elapsedTimeout = 0; - if ($fromLastTimeout = 0.0 === $timeout && '-0' === (string) $timeout) { - $timeout = null; - } elseif ($fromLastTimeout = 0 > $timeout) { - $timeout = -$timeout; + if ((0.0 === $timeout && '-0' === (string) $timeout) || 0 > $timeout) { + $timeout = $timeout ? -$timeout : null; + + /** @var ClientState $multi */ + foreach ($runningResponses as [$multi]) { + if (null !== $multi->lastTimeout) { + $elapsedTimeout = max($elapsedTimeout, $lastActivity - $multi->lastTimeout); + } + } } while (true) { @@ -162,8 +167,7 @@ public static function stream(iterable $responses, ?float $timeout = null): \Gen $timeoutMin = $timeout ?? \INF; /** @var ClientState $multi */ - foreach ($runningResponses as $i => [$multi]) { - $responses = &$runningResponses[$i][1]; + foreach ($runningResponses as $i => [$multi, &$responses]) { self::perform($multi, $responses); foreach ($responses as $j => $response) { @@ -171,26 +175,25 @@ public static function stream(iterable $responses, ?float $timeout = null): \Gen $timeoutMin = min($timeoutMin, $response->timeout, 1); $chunk = false; - if ($fromLastTimeout && null !== $multi->lastTimeout) { - $elapsedTimeout = hrtime(true) / 1E9 - $multi->lastTimeout; - } - if (isset($multi->handlesActivity[$j])) { $multi->lastTimeout = null; + $elapsedTimeout = 0; } elseif (!isset($multi->openHandles[$j])) { + $hasActivity = true; unset($responses[$j]); continue; } elseif ($elapsedTimeout >= $timeoutMax) { $multi->handlesActivity[$j] = [new ErrorChunk($response->offset, sprintf('Idle timeout reached for "%s".', $response->getInfo('url')))]; $multi->lastTimeout ??= $lastActivity; + $elapsedTimeout = $timeoutMax; } else { continue; } - while ($multi->handlesActivity[$j] ?? false) { - $hasActivity = true; - $elapsedTimeout = 0; + $lastActivity = null; + $hasActivity = true; + while ($multi->handlesActivity[$j] ?? false) { if (\is_string($chunk = array_shift($multi->handlesActivity[$j]))) { if (null !== $response->inflate && false === $chunk = @inflate_add($response->inflate, $chunk)) { $multi->handlesActivity[$j] = [null, new TransportException(sprintf('Error while processing content unencoding for "%s".', $response->getInfo('url')))]; @@ -227,7 +230,6 @@ public static function stream(iterable $responses, ?float $timeout = null): \Gen } } elseif ($chunk instanceof ErrorChunk) { unset($responses[$j]); - $elapsedTimeout = $timeoutMax; } elseif ($chunk instanceof FirstChunk) { if ($response->logger) { $info = $response->getInfo(); @@ -274,10 +276,12 @@ public static function stream(iterable $responses, ?float $timeout = null): \Gen if ($chunk instanceof ErrorChunk && !$chunk->didThrow()) { // Ensure transport exceptions are always thrown $chunk->getContent(); + throw new \LogicException('A transport exception should have been thrown.'); } } if (!$responses) { + $hasActivity = true; unset($runningResponses[$i]); } @@ -291,7 +295,7 @@ public static function stream(iterable $responses, ?float $timeout = null): \Gen } if ($hasActivity) { - $lastActivity = hrtime(true) / 1E9; + $lastActivity ??= hrtime(true) / 1E9; continue; } diff --git a/Tests/AsyncDecoratorTraitTest.php b/Tests/AsyncDecoratorTraitTest.php index 97e4c42..8e8b433 100644 --- a/Tests/AsyncDecoratorTraitTest.php +++ b/Tests/AsyncDecoratorTraitTest.php @@ -232,6 +232,20 @@ public function testBufferPurePassthru() $this->assertStringContainsString('SERVER_PROTOCOL', $response->getContent()); $this->assertStringContainsString('HTTP_HOST', $response->getContent()); + + $client = new class(parent::getHttpClient(__FUNCTION__)) implements HttpClientInterface { + use AsyncDecoratorTrait; + + public function request(string $method, string $url, array $options = []): ResponseInterface + { + return new AsyncResponse($this->client, $method, $url, $options); + } + }; + + $response = $client->request('GET', 'http://localhost:8057/'); + + $this->assertStringContainsString('SERVER_PROTOCOL', $response->getContent()); + $this->assertStringContainsString('HTTP_HOST', $response->getContent()); } public function testRetryTimeout()