@@ -95,8 +95,10 @@ struct ReadStream
95
95
int16 ios_in_progress ;
96
96
int16 queue_size ;
97
97
int16 max_pinned_buffers ;
98
+ int16 forwarded_buffers ;
98
99
int16 pinned_buffers ;
99
100
int16 distance ;
101
+ int16 initialized_buffers ;
100
102
bool advice_enabled ;
101
103
bool temporary ;
102
104
@@ -224,8 +226,10 @@ static bool
224
226
read_stream_start_pending_read (ReadStream * stream )
225
227
{
226
228
bool need_wait ;
229
+ int requested_nblocks ;
227
230
int nblocks ;
228
231
int flags ;
232
+ int forwarded ;
229
233
int16 io_index ;
230
234
int16 overflow ;
231
235
int16 buffer_index ;
@@ -272,11 +276,21 @@ read_stream_start_pending_read(ReadStream *stream)
272
276
}
273
277
}
274
278
275
- /* How many more buffers is this backend allowed? */
279
+ /*
280
+ * How many more buffers is this backend allowed?
281
+ *
282
+ * Forwarded buffers are already pinned and map to the leading blocks of
283
+ * the pending read (the remaining portion of an earlier short read that
284
+ * we're about to continue). They are not counted in pinned_buffers, but
285
+ * they are counted as pins already held by this backend according to the
286
+ * buffer manager, so they must be added to the limit it grants us.
287
+ */
276
288
if (stream -> temporary )
277
289
buffer_limit = Min (GetAdditionalLocalPinLimit (), PG_INT16_MAX );
278
290
else
279
291
buffer_limit = Min (GetAdditionalPinLimit (), PG_INT16_MAX );
292
+ Assert (stream -> forwarded_buffers <= stream -> pending_read_nblocks );
293
+ buffer_limit += stream -> forwarded_buffers ;
280
294
if (buffer_limit == 0 && stream -> pinned_buffers == 0 )
281
295
buffer_limit = 1 ; /* guarantee progress */
282
296
@@ -301,10 +315,16 @@ read_stream_start_pending_read(ReadStream *stream)
301
315
302
316
/*
303
317
* We say how many blocks we want to read, but it may be smaller on return
304
- * if the buffer manager decides to shorten the read.
318
+ * if the buffer manager decides to shorten the read. Initialize buffers
319
+ * to InvalidBuffer (= not a forwarded buffer) as input on first use only,
320
+ * and keep the original nblocks number so we can check for forwarded
321
+ * buffers as output, below.
305
322
*/
306
323
buffer_index = stream -> next_buffer_index ;
307
324
io_index = stream -> next_io_index ;
325
+ while (stream -> initialized_buffers < buffer_index + nblocks )
326
+ stream -> buffers [stream -> initialized_buffers ++ ] = InvalidBuffer ;
327
+ requested_nblocks = nblocks ;
308
328
need_wait = StartReadBuffers (& stream -> ios [io_index ].op ,
309
329
& stream -> buffers [buffer_index ],
310
330
stream -> pending_read_blocknum ,
@@ -333,16 +353,35 @@ read_stream_start_pending_read(ReadStream *stream)
333
353
stream -> seq_blocknum = stream -> pending_read_blocknum + nblocks ;
334
354
}
335
355
356
+ /*
357
+ * How many pins were acquired but forwarded to the next call? These need
358
+ * to be passed to the next StartReadBuffers() call by leaving them
359
+ * exactly where they are in the queue, or released if the stream ends
360
+ * early. We need the number for accounting purposes, since they are not
361
+ * counted in stream->pinned_buffers but we already hold them.
362
+ */
363
+ forwarded = 0 ;
364
+ while (nblocks + forwarded < requested_nblocks &&
365
+ stream -> buffers [buffer_index + nblocks + forwarded ] != InvalidBuffer )
366
+ forwarded ++ ;
367
+ stream -> forwarded_buffers = forwarded ;
368
+
336
369
/*
337
370
* We gave a contiguous range of buffer space to StartReadBuffers(), but
338
- * we want it to wrap around at queue_size. Slide overflowing buffers to
339
- * the front of the array.
371
+ * we want it to wrap around at queue_size. Copy overflowing buffers to
372
+ * the front of the array where they'll be consumed, but also leave a copy
373
+ * in the overflow zone which the I/O operation has a pointer to (it needs
374
+ * a contiguous array). Both copies will be cleared when the buffers are
375
+ * handed to the consumer.
340
376
*/
341
- overflow = (buffer_index + nblocks ) - stream -> queue_size ;
377
+ overflow = (buffer_index + nblocks + forwarded ) - stream -> queue_size ;
342
378
if (overflow > 0 )
343
- memmove (& stream -> buffers [0 ],
344
- & stream -> buffers [stream -> queue_size ],
345
- sizeof (stream -> buffers [0 ]) * overflow );
379
+ {
380
+ Assert (overflow < stream -> queue_size ); /* can't overlap */
381
+ memcpy (& stream -> buffers [0 ],
382
+ & stream -> buffers [stream -> queue_size ],
383
+ sizeof (stream -> buffers [0 ]) * overflow );
384
+ }
346
385
347
386
/* Compute location of start of next read, without using % operator. */
348
387
buffer_index += nblocks ;
@@ -719,10 +758,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
719
758
720
759
/* Fast path assumptions. */
721
760
Assert (stream -> ios_in_progress == 0 );
761
+ Assert (stream -> forwarded_buffers == 0 );
722
762
Assert (stream -> pinned_buffers == 1 );
723
763
Assert (stream -> distance == 1 );
724
764
Assert (stream -> pending_read_nblocks == 0 );
725
765
Assert (stream -> per_buffer_data_size == 0 );
766
+ Assert (stream -> initialized_buffers > stream -> oldest_buffer_index );
726
767
727
768
/* We're going to return the buffer we pinned last time. */
728
769
oldest_buffer_index = stream -> oldest_buffer_index ;
@@ -771,6 +812,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
771
812
stream -> distance = 0 ;
772
813
stream -> oldest_buffer_index = stream -> next_buffer_index ;
773
814
stream -> pinned_buffers = 0 ;
815
+ stream -> buffers [oldest_buffer_index ] = InvalidBuffer ;
774
816
}
775
817
776
818
stream -> fast_path = false;
@@ -846,10 +888,15 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
846
888
stream -> seq_until_processed = InvalidBlockNumber ;
847
889
}
848
890
849
- #ifdef CLOBBER_FREED_MEMORY
850
- /* Clobber old buffer for debugging purposes. */
891
+ /*
892
+ * We must zap this queue entry, or else it would appear as a forwarded
893
+ * buffer. If it's potentially in the overflow zone (ie from a
894
+ * multi-block I/O that wrapped around the queue), also zap the copy.
895
+ */
851
896
stream -> buffers [oldest_buffer_index ] = InvalidBuffer ;
852
- #endif
897
+ if (oldest_buffer_index < stream -> io_combine_limit - 1 )
898
+ stream -> buffers [stream -> queue_size + oldest_buffer_index ] =
899
+ InvalidBuffer ;
853
900
854
901
#if defined(CLOBBER_FREED_MEMORY ) || defined(USE_VALGRIND )
855
902
@@ -894,6 +941,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
894
941
#ifndef READ_STREAM_DISABLE_FAST_PATH
895
942
/* See if we can take the fast path for all-cached scans next time. */
896
943
if (stream -> ios_in_progress == 0 &&
944
+ stream -> forwarded_buffers == 0 &&
897
945
stream -> pinned_buffers == 1 &&
898
946
stream -> distance == 1 &&
899
947
stream -> pending_read_nblocks == 0 &&
@@ -929,6 +977,7 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
929
977
void
930
978
read_stream_reset (ReadStream * stream )
931
979
{
980
+ int16 index ;
932
981
Buffer buffer ;
933
982
934
983
/* Stop looking ahead. */
@@ -942,6 +991,24 @@ read_stream_reset(ReadStream *stream)
942
991
while ((buffer = read_stream_next_buffer (stream , NULL )) != InvalidBuffer )
943
992
ReleaseBuffer (buffer );
944
993
994
+ /* Unpin any unused forwarded buffers. */
995
+ index = stream -> next_buffer_index ;
996
+ while (index < stream -> initialized_buffers &&
997
+ (buffer = stream -> buffers [index ]) != InvalidBuffer )
998
+ {
999
+ Assert (stream -> forwarded_buffers > 0 );
1000
+ stream -> forwarded_buffers -- ;
1001
+ ReleaseBuffer (buffer );
1002
+
1003
+ stream -> buffers [index ] = InvalidBuffer ;
1004
+ if (index < stream -> io_combine_limit - 1 )
1005
+ stream -> buffers [stream -> queue_size + index ] = InvalidBuffer ;
1006
+
1007
+ if (++ index == stream -> queue_size )
1008
+ index = 0 ;
1009
+ }
1010
+
1011
+ Assert (stream -> forwarded_buffers == 0 );
945
1012
Assert (stream -> pinned_buffers == 0 );
946
1013
Assert (stream -> ios_in_progress == 0 );
947
1014
0 commit comments