Skip to content

Commit 2334131

Browse files
[HttpClient] always yield a LastChunk in AsyncResponse on destruction
1 parent ad94b39 commit 2334131

File tree

2 files changed

+157
-108
lines changed

2 files changed

+157
-108
lines changed

src/Symfony/Component/HttpClient/Response/AsyncResponse.php

Lines changed: 138 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public function getHeaders(bool $throw = true): array
6969
$headers = $this->response->getHeaders(false);
7070

7171
if ($throw) {
72-
$this->checkStatusCode($this->getInfo('http_code'));
72+
$this->checkStatusCode();
7373
}
7474

7575
return $headers;
@@ -126,31 +126,50 @@ public function cancel(): void
126126
return;
127127
}
128128

129-
$context = new AsyncContext($this->passthru, $client, $this->response, $this->info, $this->content, $this->offset);
130-
if (null === $stream = ($this->passthru)(new LastChunk(), $context)) {
131-
return;
132-
}
129+
try {
130+
foreach (self::passthru($client, $this, new LastChunk()) as $chunk) {
131+
// no-op
132+
}
133133

134-
if (!$stream instanceof \Iterator) {
135-
throw new \LogicException(sprintf('A chunk passthru must return an "Iterator", "%s" returned.', get_debug_type($stream)));
134+
$this->passthru = null;
135+
} catch (ExceptionInterface $e) {
136+
// ignore any errors when canceling
136137
}
138+
}
137139

140+
public function __destruct()
141+
{
138142
try {
139-
foreach ($stream as $chunk) {
140-
if ($chunk->isLast()) {
141-
break;
143+
$e = null;
144+
145+
if ($this->initializer && null === $this->getInfo('error')) {
146+
$this->getHeaders(true);
147+
}
148+
} catch (\Throwable $e) {
149+
// no-op
150+
} finally {
151+
if (!$this->passthru || null !== $this->getInfo('error')) {
152+
if (null !== $e) {
153+
throw $e;
142154
}
155+
156+
return;
143157
}
144158

145-
$stream->next();
159+
$this->info['canceled'] = true;
160+
$this->info['error'] = 'Response has been canceled.';
146161

147-
if ($stream->valid()) {
148-
throw new \LogicException('A chunk passthru cannot yield after the last chunk.');
162+
try {
163+
foreach (self::passthru($this->client, $this, new LastChunk()) as $chunk) {
164+
// no-op
165+
}
166+
} catch (ExceptionInterface $e) {
167+
// ignore any errors when destructing
149168
}
150169

151-
$stream = $this->passthru = null;
152-
} catch (ExceptionInterface $e) {
153-
// ignore any errors when canceling
170+
if (null !== $e) {
171+
throw $e;
172+
}
154173
}
155174
}
156175

@@ -201,124 +220,135 @@ public static function stream(iterable $responses, float $timeout = null, string
201220
continue;
202221
}
203222

204-
$context = new AsyncContext($r->passthru, $r->client, $r->response, $r->info, $r->content, $r->offset);
205-
if (null === $stream = ($r->passthru)($chunk, $context)) {
206-
if ($r->response === $response && (null !== $chunk->getError() || $chunk->isLast())) {
207-
throw new \LogicException('A chunk passthru cannot swallow the last chunk.');
208-
}
223+
foreach (self::passthru($r->client, $r, $chunk, $asyncMap) as $chunk) {
224+
yield $r => $chunk;
225+
}
209226

210-
continue;
227+
if ($r->response !== $response && isset($asyncMap[$response])) {
228+
break;
211229
}
212-
$chunk = null;
230+
}
213231

214-
if (!$stream instanceof \Iterator) {
215-
throw new \LogicException(sprintf('A chunk passthru must return an "Iterator", "%s" returned.', get_debug_type($stream)));
232+
if (null === $chunk->getError() && !$chunk->isLast() && $r->response === $response && null !== $r->client) {
233+
throw new \LogicException('A chunk passthru must yield an "isLast()" chunk before ending a stream.');
234+
}
235+
236+
$responses = [];
237+
foreach ($asyncMap as $response) {
238+
$r = $asyncMap[$response];
239+
240+
if (null !== $r->client) {
241+
$responses[] = $asyncMap[$response];
216242
}
243+
}
244+
}
245+
}
217246

218-
while (true) {
219-
try {
220-
if (null !== $chunk) {
221-
$stream->next();
222-
}
223-
224-
if (!$stream->valid()) {
225-
break;
226-
}
227-
} catch (\Throwable $e) {
228-
$r->info['error'] = $e->getMessage();
229-
$r->response->cancel();
230-
231-
yield $r => $chunk = new ErrorChunk($r->offset, $e);
232-
$chunk->didThrow() ?: $chunk->getContent();
233-
unset($asyncMap[$response]);
234-
break;
235-
}
247+
private static function passthru(HttpClientInterface $client, self $r, ChunkInterface $chunk, \SplObjectStorage $asyncMap = null): \Generator
248+
{
249+
$response = $r->response;
250+
$context = new AsyncContext($r->passthru, $client, $r->response, $r->info, $r->content, $r->offset);
251+
if (null === $stream = ($r->passthru)($chunk, $context)) {
252+
if ($r->response === $response && (null !== $chunk->getError() || $chunk->isLast())) {
253+
throw new \LogicException('A chunk passthru cannot swallow the last chunk.');
254+
}
236255

237-
$chunk = $stream->current();
256+
return;
257+
}
258+
$chunk = null;
238259

239-
if (!$chunk instanceof ChunkInterface) {
240-
throw new \LogicException(sprintf('A chunk passthru must yield instances of "%s", "%s" yielded.', ChunkInterface::class, get_debug_type($chunk)));
241-
}
260+
if (!$stream instanceof \Iterator) {
261+
throw new \LogicException(sprintf('A chunk passthru must return an "Iterator", "%s" returned.', get_debug_type($stream)));
262+
}
242263

243-
if (null !== $chunk->getError()) {
244-
// no-op
245-
} elseif ($chunk->isFirst()) {
246-
$e = $r->openBuffer();
247-
248-
yield $r => $chunk;
249-
250-
if (null === $e) {
251-
continue;
252-
}
253-
254-
$r->response->cancel();
255-
$chunk = new ErrorChunk($r->offset, $e);
256-
} elseif ('' !== $content = $chunk->getContent()) {
257-
if (null !== $r->shouldBuffer) {
258-
throw new \LogicException('A chunk passthru must yield an "isFirst()" chunk before any content chunk.');
259-
}
260-
261-
if (null !== $r->content && \strlen($content) !== fwrite($r->content, $content)) {
262-
$chunk = new ErrorChunk($r->offset, new TransportException(sprintf('Failed writing %d bytes to the response buffer.', \strlen($content))));
263-
$r->info['error'] = $chunk->getError();
264-
$r->response->cancel();
265-
}
266-
}
264+
while (true) {
265+
try {
266+
if (null !== $chunk) {
267+
$stream->next();
268+
}
267269

268-
if (null === $chunk->getError()) {
269-
$r->offset += \strlen($content);
270+
if (!$stream->valid()) {
271+
break;
272+
}
273+
} catch (\Throwable $e) {
274+
$r->info['error'] = $e->getMessage();
275+
$r->response->cancel();
270276

271-
yield $r => $chunk;
277+
yield $r => $chunk = new ErrorChunk($r->offset, $e);
278+
$chunk->didThrow() ?: $chunk->getContent();
279+
unset($asyncMap[$response]);
280+
break;
281+
}
272282

273-
if (!$chunk->isLast()) {
274-
continue;
275-
}
283+
$chunk = $stream->current();
276284

277-
$stream->next();
285+
if (!$chunk instanceof ChunkInterface) {
286+
throw new \LogicException(sprintf('A chunk passthru must yield instances of "%s", "%s" yielded.', ChunkInterface::class, get_debug_type($chunk)));
287+
}
278288

279-
if ($stream->valid()) {
280-
throw new \LogicException('A chunk passthru cannot yield after an "isLast()" chunk.');
281-
}
289+
if (null !== $chunk->getError()) {
290+
// no-op
291+
} elseif ($chunk->isFirst()) {
292+
$e = $r->openBuffer();
282293

283-
$r->passthru = null;
284-
} else {
285-
if ($chunk instanceof ErrorChunk) {
286-
$chunk->didThrow(false);
287-
} else {
288-
try {
289-
$chunk = new ErrorChunk($chunk->getOffset(), !$chunk->isTimeout() ?: $chunk->getError());
290-
} catch (TransportExceptionInterface $e) {
291-
$chunk = new ErrorChunk($chunk->getOffset(), $e);
292-
}
293-
}
294+
yield $r => $chunk;
294295

295-
yield $r => $chunk;
296-
$chunk->didThrow() ?: $chunk->getContent();
297-
}
296+
if ($r->initializer && null === $r->getInfo('error')) {
297+
// Ensure the HTTP status code is always checked
298+
$r->getHeaders(true);
299+
}
298300

299-
unset($asyncMap[$response]);
300-
break;
301+
if (null === $e) {
302+
continue;
301303
}
302304

303-
$stream = $context = null;
305+
$r->response->cancel();
306+
$chunk = new ErrorChunk($r->offset, $e);
307+
} elseif ('' !== $content = $chunk->getContent()) {
308+
if (null !== $r->shouldBuffer) {
309+
throw new \LogicException('A chunk passthru must yield an "isFirst()" chunk before any content chunk.');
310+
}
304311

305-
if ($r->response !== $response && isset($asyncMap[$response])) {
306-
break;
312+
if (null !== $r->content && \strlen($content) !== fwrite($r->content, $content)) {
313+
$chunk = new ErrorChunk($r->offset, new TransportException(sprintf('Failed writing %d bytes to the response buffer.', \strlen($content))));
314+
$r->info['error'] = $chunk->getError();
315+
$r->response->cancel();
307316
}
308317
}
309318

310-
if (null === $chunk->getError() && !$chunk->isLast() && $r->response === $response && null !== $r->client) {
311-
throw new \LogicException('A chunk passthru must yield an "isLast()" chunk before ending a stream.');
312-
}
319+
if (null === $chunk->getError()) {
320+
$r->offset += \strlen($content);
313321

314-
$responses = [];
315-
foreach ($asyncMap as $response) {
316-
$r = $asyncMap[$response];
322+
yield $r => $chunk;
317323

318-
if (null !== $r->client) {
319-
$responses[] = $asyncMap[$response];
324+
if (!$chunk->isLast()) {
325+
continue;
326+
}
327+
328+
$stream->next();
329+
330+
if ($stream->valid()) {
331+
throw new \LogicException('A chunk passthru cannot yield after an "isLast()" chunk.');
332+
}
333+
334+
$r->passthru = null;
335+
} else {
336+
if ($chunk instanceof ErrorChunk) {
337+
$chunk->didThrow(false);
338+
} else {
339+
try {
340+
$chunk = new ErrorChunk($chunk->getOffset(), !$chunk->isTimeout() ?: $chunk->getError());
341+
} catch (TransportExceptionInterface $e) {
342+
$chunk = new ErrorChunk($chunk->getOffset(), $e);
343+
}
320344
}
345+
346+
yield $r => $chunk;
347+
$chunk->didThrow() ?: $chunk->getContent();
321348
}
349+
350+
unset($asyncMap[$response]);
351+
break;
322352
}
323353
}
324354

src/Symfony/Component/HttpClient/Tests/AsyncDecoratorTraitTest.php

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use Symfony\Component\HttpClient\Response\AsyncContext;
1616
use Symfony\Component\HttpClient\Response\AsyncResponse;
1717
use Symfony\Contracts\HttpClient\ChunkInterface;
18+
use Symfony\Contracts\HttpClient\Exception\ClientExceptionInterface;
1819
use Symfony\Contracts\HttpClient\HttpClientInterface;
1920
use Symfony\Contracts\HttpClient\ResponseInterface;
2021

@@ -163,4 +164,22 @@ public function testProcessingHappensOnce()
163164
$this->assertTrue($chunk->isLast());
164165
$this->assertSame(1, $lastChunks);
165166
}
167+
168+
public function testLastChunkIsYieldOnHttpExceptionAtDestructTime()
169+
{
170+
$lastChunk = null;
171+
$client = $this->getHttpClient(__FUNCTION__, function (ChunkInterface $chunk, AsyncContext $context) use (&$lastChunk) {
172+
$lastChunk = $chunk;
173+
174+
yield $chunk;
175+
});
176+
177+
try {
178+
$client->request('GET', 'http://localhost:8057/404');
179+
$this->fail(ClientExceptionInterface::class.' expected');
180+
} catch (ClientExceptionInterface $e) {
181+
}
182+
183+
$this->assertTrue($lastChunk->isLast());
184+
}
166185
}

0 commit comments

Comments
 (0)