Skip to content

Commit ed0b87c

Browse files
committed
Support buffer forwarding in read_stream.c.
In preparation for a follow-up change to the buffer manager, teach read_stream.c to manage buffers "forwarded" from one StartReadBuffers() call to the next after a short read. This involves a small amount of extra book-keeping, and opens the way for lower levels to split I/O operations without having to drop pins, as required for efficient handling of various edge cases. Concretely, the "buffers" argument will change from an out parameter to an in/out parameter. Buffer queue elements must be initialized on first use and cleared after they're consumed, but forwarded buffers are left where they fall ahead of the current pending read in the queue, ready for use by the operation that continues where a short read left off. The stream also needs to count them for pin limit management and release them on reset/early end. Tested-by: Andres Freund <andres@anarazel.de> (earlier versions) Discussion: https://postgr.es/m/CA%2BhUKGK_%3D4CVmMHvsHjOVrK6t4F%3DLBpFzsrr3R%2BaJYN8kcTfWg%40mail.gmail.com
1 parent 14413d0 commit ed0b87c

File tree

1 file changed

+78
-11
lines changed

1 file changed

+78
-11
lines changed

src/backend/storage/aio/read_stream.c

Lines changed: 78 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,10 @@ struct ReadStream
9595
int16 ios_in_progress;
9696
int16 queue_size;
9797
int16 max_pinned_buffers;
98+
int16 forwarded_buffers;
9899
int16 pinned_buffers;
99100
int16 distance;
101+
int16 initialized_buffers;
100102
bool advice_enabled;
101103
bool temporary;
102104

@@ -224,8 +226,10 @@ static bool
224226
read_stream_start_pending_read(ReadStream *stream)
225227
{
226228
bool need_wait;
229+
int requested_nblocks;
227230
int nblocks;
228231
int flags;
232+
int forwarded;
229233
int16 io_index;
230234
int16 overflow;
231235
int16 buffer_index;
@@ -272,11 +276,21 @@ read_stream_start_pending_read(ReadStream *stream)
272276
}
273277
}
274278

275-
/* How many more buffers is this backend allowed? */
279+
/*
280+
* How many more buffers is this backend allowed?
281+
*
282+
* Forwarded buffers are already pinned and map to the leading blocks of
283+
* the pending read (the remaining portion of an earlier short read that
284+
* we're about to continue). They are not counted in pinned_buffers, but
285+
* they are counted as pins already held by this backend according to the
286+
* buffer manager, so they must be added to the limit it grants us.
287+
*/
276288
if (stream->temporary)
277289
buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX);
278290
else
279291
buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX);
292+
Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
293+
buffer_limit += stream->forwarded_buffers;
280294
if (buffer_limit == 0 && stream->pinned_buffers == 0)
281295
buffer_limit = 1; /* guarantee progress */
282296

@@ -301,10 +315,16 @@ read_stream_start_pending_read(ReadStream *stream)
301315

302316
/*
303317
* We say how many blocks we want to read, but it may be smaller on return
304-
* if the buffer manager decides to shorten the read.
318+
* if the buffer manager decides to shorten the read. Initialize buffers
319+
* to InvalidBuffer (= not a forwarded buffer) as input on first use only,
320+
* and keep the original nblocks number so we can check for forwarded
321+
* buffers as output, below.
305322
*/
306323
buffer_index = stream->next_buffer_index;
307324
io_index = stream->next_io_index;
325+
while (stream->initialized_buffers < buffer_index + nblocks)
326+
stream->buffers[stream->initialized_buffers++] = InvalidBuffer;
327+
requested_nblocks = nblocks;
308328
need_wait = StartReadBuffers(&stream->ios[io_index].op,
309329
&stream->buffers[buffer_index],
310330
stream->pending_read_blocknum,
@@ -333,16 +353,35 @@ read_stream_start_pending_read(ReadStream *stream)
333353
stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
334354
}
335355

356+
/*
357+
* How many pins were acquired but forwarded to the next call? These need
358+
* to be passed to the next StartReadBuffers() call by leaving them
359+
* exactly where they are in the queue, or released if the stream ends
360+
* early. We need the number for accounting purposes, since they are not
361+
* counted in stream->pinned_buffers but we already hold them.
362+
*/
363+
forwarded = 0;
364+
while (nblocks + forwarded < requested_nblocks &&
365+
stream->buffers[buffer_index + nblocks + forwarded] != InvalidBuffer)
366+
forwarded++;
367+
stream->forwarded_buffers = forwarded;
368+
336369
/*
337370
* We gave a contiguous range of buffer space to StartReadBuffers(), but
338-
* we want it to wrap around at queue_size. Slide overflowing buffers to
339-
* the front of the array.
371+
* we want it to wrap around at queue_size. Copy overflowing buffers to
372+
* the front of the array where they'll be consumed, but also leave a copy
373+
* in the overflow zone which the I/O operation has a pointer to (it needs
374+
* a contiguous array). Both copies will be cleared when the buffers are
375+
* handed to the consumer.
340376
*/
341-
overflow = (buffer_index + nblocks) - stream->queue_size;
377+
overflow = (buffer_index + nblocks + forwarded) - stream->queue_size;
342378
if (overflow > 0)
343-
memmove(&stream->buffers[0],
344-
&stream->buffers[stream->queue_size],
345-
sizeof(stream->buffers[0]) * overflow);
379+
{
380+
Assert(overflow < stream->queue_size); /* can't overlap */
381+
memcpy(&stream->buffers[0],
382+
&stream->buffers[stream->queue_size],
383+
sizeof(stream->buffers[0]) * overflow);
384+
}
346385

347386
/* Compute location of start of next read, without using % operator. */
348387
buffer_index += nblocks;
@@ -719,10 +758,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
719758

720759
/* Fast path assumptions. */
721760
Assert(stream->ios_in_progress == 0);
761+
Assert(stream->forwarded_buffers == 0);
722762
Assert(stream->pinned_buffers == 1);
723763
Assert(stream->distance == 1);
724764
Assert(stream->pending_read_nblocks == 0);
725765
Assert(stream->per_buffer_data_size == 0);
766+
Assert(stream->initialized_buffers > stream->oldest_buffer_index);
726767

727768
/* We're going to return the buffer we pinned last time. */
728769
oldest_buffer_index = stream->oldest_buffer_index;
@@ -771,6 +812,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
771812
stream->distance = 0;
772813
stream->oldest_buffer_index = stream->next_buffer_index;
773814
stream->pinned_buffers = 0;
815+
stream->buffers[oldest_buffer_index] = InvalidBuffer;
774816
}
775817

776818
stream->fast_path = false;
@@ -846,10 +888,15 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
846888
stream->seq_until_processed = InvalidBlockNumber;
847889
}
848890

849-
#ifdef CLOBBER_FREED_MEMORY
850-
/* Clobber old buffer for debugging purposes. */
891+
/*
892+
* We must zap this queue entry, or else it would appear as a forwarded
893+
* buffer. If it's potentially in the overflow zone (ie from a
894+
* multi-block I/O that wrapped around the queue), also zap the copy.
895+
*/
851896
stream->buffers[oldest_buffer_index] = InvalidBuffer;
852-
#endif
897+
if (oldest_buffer_index < stream->io_combine_limit - 1)
898+
stream->buffers[stream->queue_size + oldest_buffer_index] =
899+
InvalidBuffer;
853900

854901
#if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
855902

@@ -894,6 +941,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
894941
#ifndef READ_STREAM_DISABLE_FAST_PATH
895942
/* See if we can take the fast path for all-cached scans next time. */
896943
if (stream->ios_in_progress == 0 &&
944+
stream->forwarded_buffers == 0 &&
897945
stream->pinned_buffers == 1 &&
898946
stream->distance == 1 &&
899947
stream->pending_read_nblocks == 0 &&
@@ -929,6 +977,7 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
929977
void
930978
read_stream_reset(ReadStream *stream)
931979
{
980+
int16 index;
932981
Buffer buffer;
933982

934983
/* Stop looking ahead. */
@@ -942,6 +991,24 @@ read_stream_reset(ReadStream *stream)
942991
while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
943992
ReleaseBuffer(buffer);
944993

994+
/* Unpin any unused forwarded buffers. */
995+
index = stream->next_buffer_index;
996+
while (index < stream->initialized_buffers &&
997+
(buffer = stream->buffers[index]) != InvalidBuffer)
998+
{
999+
Assert(stream->forwarded_buffers > 0);
1000+
stream->forwarded_buffers--;
1001+
ReleaseBuffer(buffer);
1002+
1003+
stream->buffers[index] = InvalidBuffer;
1004+
if (index < stream->io_combine_limit - 1)
1005+
stream->buffers[stream->queue_size + index] = InvalidBuffer;
1006+
1007+
if (++index == stream->queue_size)
1008+
index = 0;
1009+
}
1010+
1011+
Assert(stream->forwarded_buffers == 0);
9451012
Assert(stream->pinned_buffers == 0);
9461013
Assert(stream->ios_in_progress == 0);
9471014

0 commit comments

Comments
 (0)