@@ -69,7 +69,7 @@ public function getHeaders(bool $throw = true): array
69
69
$ headers = $ this ->response ->getHeaders (false );
70
70
71
71
if ($ throw ) {
72
- $ this ->checkStatusCode ($ this -> getInfo ( ' http_code ' ) );
72
+ $ this ->checkStatusCode ();
73
73
}
74
74
75
75
return $ headers ;
@@ -126,31 +126,50 @@ public function cancel(): void
126
126
return ;
127
127
}
128
128
129
- $ context = new AsyncContext ( $ this -> passthru , $ client , $ this -> response , $ this -> info , $ this -> content , $ this -> offset );
130
- if ( null === $ stream = ( $ this -> passthru )( new LastChunk (), $ context ) ) {
131
- return ;
132
- }
129
+ try {
130
+ foreach ( self :: passthru ( $ client , $ this , new LastChunk ()) as $ chunk ) {
131
+ // no-op
132
+ }
133
133
134
- if (!$ stream instanceof \Iterator) {
135
- throw new \LogicException (sprintf ('A chunk passthru must return an "Iterator", "%s" returned. ' , get_debug_type ($ stream )));
134
+ $ this ->passthru = null ;
135
+ } catch (ExceptionInterface $ e ) {
136
+ // ignore any errors when canceling
136
137
}
138
+ }
137
139
140
+ public function __destruct ()
141
+ {
138
142
try {
139
- foreach ($ stream as $ chunk ) {
140
- if ($ chunk ->isLast ()) {
141
- break ;
143
+ $ e = null ;
144
+
145
+ if ($ this ->initializer && null === $ this ->getInfo ('error ' )) {
146
+ $ this ->getHeaders (true );
147
+ }
148
+ } catch (\Throwable $ e ) {
149
+ // no-op
150
+ } finally {
151
+ if (!$ this ->passthru || null !== $ this ->getInfo ('error ' )) {
152
+ if (null !== $ e ) {
153
+ throw $ e ;
142
154
}
155
+
156
+ return ;
143
157
}
144
158
145
- $ stream ->next ();
159
+ $ this ->info ['canceled ' ] = true ;
160
+ $ this ->info ['error ' ] = 'Response has been canceled. ' ;
146
161
147
- if ($ stream ->valid ()) {
148
- throw new \LogicException ('A chunk passthru cannot yield after the last chunk. ' );
162
+ try {
163
+ foreach (self ::passthru ($ this ->client , $ this , new LastChunk ()) as $ chunk ) {
164
+ // no-op
165
+ }
166
+ } catch (ExceptionInterface $ e ) {
167
+ // ignore any errors when destructing
149
168
}
150
169
151
- $ stream = $ this -> passthru = null ;
152
- } catch ( ExceptionInterface $ e ) {
153
- // ignore any errors when canceling
170
+ if ( null !== $ e ) {
171
+ throw $ e ;
172
+ }
154
173
}
155
174
}
156
175
@@ -201,124 +220,135 @@ public static function stream(iterable $responses, float $timeout = null, string
201
220
continue ;
202
221
}
203
222
204
- $ context = new AsyncContext ($ r ->passthru , $ r ->client , $ r ->response , $ r ->info , $ r ->content , $ r ->offset );
205
- if (null === $ stream = ($ r ->passthru )($ chunk , $ context )) {
206
- if ($ r ->response === $ response && (null !== $ chunk ->getError () || $ chunk ->isLast ())) {
207
- throw new \LogicException ('A chunk passthru cannot swallow the last chunk. ' );
208
- }
223
+ foreach (self ::passthru ($ r ->client , $ r , $ chunk , $ asyncMap ) as $ chunk ) {
224
+ yield $ r => $ chunk ;
225
+ }
209
226
210
- continue ;
227
+ if ($ r ->response !== $ response && isset ($ asyncMap [$ response ])) {
228
+ break ;
211
229
}
212
- $ chunk = null ;
230
+ }
213
231
214
- if (!$ stream instanceof \Iterator) {
215
- throw new \LogicException (sprintf ('A chunk passthru must return an "Iterator", "%s" returned. ' , get_debug_type ($ stream )));
232
+ if (null === $ chunk ->getError () && !$ chunk ->isLast () && $ r ->response === $ response && null !== $ r ->client ) {
233
+ throw new \LogicException ('A chunk passthru must yield an "isLast()" chunk before ending a stream. ' );
234
+ }
235
+
236
+ $ responses = [];
237
+ foreach ($ asyncMap as $ response ) {
238
+ $ r = $ asyncMap [$ response ];
239
+
240
+ if (null !== $ r ->client ) {
241
+ $ responses [] = $ asyncMap [$ response ];
216
242
}
243
+ }
244
+ }
245
+ }
217
246
218
- while (true ) {
219
- try {
220
- if (null !== $ chunk ) {
221
- $ stream ->next ();
222
- }
223
-
224
- if (!$ stream ->valid ()) {
225
- break ;
226
- }
227
- } catch (\Throwable $ e ) {
228
- $ r ->info ['error ' ] = $ e ->getMessage ();
229
- $ r ->response ->cancel ();
230
-
231
- yield $ r => $ chunk = new ErrorChunk ($ r ->offset , $ e );
232
- $ chunk ->didThrow () ?: $ chunk ->getContent ();
233
- unset($ asyncMap [$ response ]);
234
- break ;
235
- }
247
+ private static function passthru (HttpClientInterface $ client , self $ r , ChunkInterface $ chunk , \SplObjectStorage $ asyncMap = null ): \Generator
248
+ {
249
+ $ response = $ r ->response ;
250
+ $ context = new AsyncContext ($ r ->passthru , $ client , $ r ->response , $ r ->info , $ r ->content , $ r ->offset );
251
+ if (null === $ stream = ($ r ->passthru )($ chunk , $ context )) {
252
+ if ($ r ->response === $ response && (null !== $ chunk ->getError () || $ chunk ->isLast ())) {
253
+ throw new \LogicException ('A chunk passthru cannot swallow the last chunk. ' );
254
+ }
236
255
237
- $ chunk = $ stream ->current ();
256
+ return ;
257
+ }
258
+ $ chunk = null ;
238
259
239
- if (!$ chunk instanceof ChunkInterface ) {
240
- throw new \LogicException (sprintf ('A chunk passthru must yield instances of "%s ", "%s" yielded . ' , ChunkInterface::class, get_debug_type ($ chunk )));
241
- }
260
+ if (!$ stream instanceof \Iterator ) {
261
+ throw new \LogicException (sprintf ('A chunk passthru must return an "Iterator ", "%s" returned . ' , get_debug_type ($ stream )));
262
+ }
242
263
243
- if (null !== $ chunk ->getError ()) {
244
- // no-op
245
- } elseif ($ chunk ->isFirst ()) {
246
- $ e = $ r ->openBuffer ();
247
-
248
- yield $ r => $ chunk ;
249
-
250
- if (null === $ e ) {
251
- continue ;
252
- }
253
-
254
- $ r ->response ->cancel ();
255
- $ chunk = new ErrorChunk ($ r ->offset , $ e );
256
- } elseif ('' !== $ content = $ chunk ->getContent ()) {
257
- if (null !== $ r ->shouldBuffer ) {
258
- throw new \LogicException ('A chunk passthru must yield an "isFirst()" chunk before any content chunk. ' );
259
- }
260
-
261
- if (null !== $ r ->content && \strlen ($ content ) !== fwrite ($ r ->content , $ content )) {
262
- $ chunk = new ErrorChunk ($ r ->offset , new TransportException (sprintf ('Failed writing %d bytes to the response buffer. ' , \strlen ($ content ))));
263
- $ r ->info ['error ' ] = $ chunk ->getError ();
264
- $ r ->response ->cancel ();
265
- }
266
- }
264
+ while (true ) {
265
+ try {
266
+ if (null !== $ chunk ) {
267
+ $ stream ->next ();
268
+ }
267
269
268
- if (null === $ chunk ->getError ()) {
269
- $ r ->offset += \strlen ($ content );
270
+ if (!$ stream ->valid ()) {
271
+ break ;
272
+ }
273
+ } catch (\Throwable $ e ) {
274
+ $ r ->info ['error ' ] = $ e ->getMessage ();
275
+ $ r ->response ->cancel ();
270
276
271
- yield $ r => $ chunk ;
277
+ yield $ r => $ chunk = new ErrorChunk ($ r ->offset , $ e );
278
+ $ chunk ->didThrow () ?: $ chunk ->getContent ();
279
+ unset($ asyncMap [$ response ]);
280
+ break ;
281
+ }
272
282
273
- if (!$ chunk ->isLast ()) {
274
- continue ;
275
- }
283
+ $ chunk = $ stream ->current ();
276
284
277
- $ stream ->next ();
285
+ if (!$ chunk instanceof ChunkInterface) {
286
+ throw new \LogicException (sprintf ('A chunk passthru must yield instances of "%s", "%s" yielded. ' , ChunkInterface::class, get_debug_type ($ chunk )));
287
+ }
278
288
279
- if ($ stream ->valid ()) {
280
- throw new \LogicException ('A chunk passthru cannot yield after an "isLast()" chunk. ' );
281
- }
289
+ if (null !== $ chunk ->getError ()) {
290
+ // no-op
291
+ } elseif ($ chunk ->isFirst ()) {
292
+ $ e = $ r ->openBuffer ();
282
293
283
- $ r ->passthru = null ;
284
- } else {
285
- if ($ chunk instanceof ErrorChunk) {
286
- $ chunk ->didThrow (false );
287
- } else {
288
- try {
289
- $ chunk = new ErrorChunk ($ chunk ->getOffset (), !$ chunk ->isTimeout () ?: $ chunk ->getError ());
290
- } catch (TransportExceptionInterface $ e ) {
291
- $ chunk = new ErrorChunk ($ chunk ->getOffset (), $ e );
292
- }
293
- }
294
+ yield $ r => $ chunk ;
294
295
295
- yield $ r => $ chunk ;
296
- $ chunk ->didThrow () ?: $ chunk ->getContent ();
297
- }
296
+ if ($ r ->initializer && null === $ r ->getInfo ('error ' )) {
297
+ // Ensure the HTTP status code is always checked
298
+ $ r ->getHeaders (true );
299
+ }
298
300
299
- unset( $ asyncMap [ $ response ]);
300
- break ;
301
+ if ( null === $ e ) {
302
+ continue ;
301
303
}
302
304
303
- $ stream = $ context = null ;
305
+ $ r ->response ->cancel ();
306
+ $ chunk = new ErrorChunk ($ r ->offset , $ e );
307
+ } elseif ('' !== $ content = $ chunk ->getContent ()) {
308
+ if (null !== $ r ->shouldBuffer ) {
309
+ throw new \LogicException ('A chunk passthru must yield an "isFirst()" chunk before any content chunk. ' );
310
+ }
304
311
305
- if ($ r ->response !== $ response && isset ($ asyncMap [$ response ])) {
306
- break ;
312
+ if (null !== $ r ->content && \strlen ($ content ) !== fwrite ($ r ->content , $ content )) {
313
+ $ chunk = new ErrorChunk ($ r ->offset , new TransportException (sprintf ('Failed writing %d bytes to the response buffer. ' , \strlen ($ content ))));
314
+ $ r ->info ['error ' ] = $ chunk ->getError ();
315
+ $ r ->response ->cancel ();
307
316
}
308
317
}
309
318
310
- if (null === $ chunk ->getError () && !$ chunk ->isLast () && $ r ->response === $ response && null !== $ r ->client ) {
311
- throw new \LogicException ('A chunk passthru must yield an "isLast()" chunk before ending a stream. ' );
312
- }
319
+ if (null === $ chunk ->getError ()) {
320
+ $ r ->offset += \strlen ($ content );
313
321
314
- $ responses = [];
315
- foreach ($ asyncMap as $ response ) {
316
- $ r = $ asyncMap [$ response ];
322
+ yield $ r => $ chunk ;
317
323
318
- if (null !== $ r ->client ) {
319
- $ responses [] = $ asyncMap [$ response ];
324
+ if (!$ chunk ->isLast ()) {
325
+ continue ;
326
+ }
327
+
328
+ $ stream ->next ();
329
+
330
+ if ($ stream ->valid ()) {
331
+ throw new \LogicException ('A chunk passthru cannot yield after an "isLast()" chunk. ' );
332
+ }
333
+
334
+ $ r ->passthru = null ;
335
+ } else {
336
+ if ($ chunk instanceof ErrorChunk) {
337
+ $ chunk ->didThrow (false );
338
+ } else {
339
+ try {
340
+ $ chunk = new ErrorChunk ($ chunk ->getOffset (), !$ chunk ->isTimeout () ?: $ chunk ->getError ());
341
+ } catch (TransportExceptionInterface $ e ) {
342
+ $ chunk = new ErrorChunk ($ chunk ->getOffset (), $ e );
343
+ }
320
344
}
345
+
346
+ yield $ r => $ chunk ;
347
+ $ chunk ->didThrow () ?: $ chunk ->getContent ();
321
348
}
349
+
350
+ unset($ asyncMap [$ response ]);
351
+ break ;
322
352
}
323
353
}
324
354
0 commit comments