Skip to content

Commit ce1a75c

Browse files
committed
Support buffer forwarding in StartReadBuffers().
StartReadBuffers() reports a short read when it finds a cached block that ends a range needing I/O by updating the caller's *nblocks. It doesn't want to have to unpin the trailing hit that it knows the caller wants, so the v17 version used sleight of hand in the name of simplicity: it included it in *nblocks as if it were part of the I/O, but internally tracked the shorter real I/O size in io_buffers_len (now removed). This API change "forwards" the delimiting buffer to the next call. It's still pinned, and still stored in the caller's array, but *nblocks no longer includes stray buffers that are not really part of the operation. The expectation is that the caller still wants the rest of the blocks and will call again starting from that point, and now it can pass the already pinned buffer back in (or choose not to and release it). The change is needed for the coming asynchronous I/O version's larger version of the problem: by definition it must move BM_IO_IN_PROGRESS negotiation from WaitReadBuffers() to StartReadBuffers(), but it might already have many buffers pinned before it discovers a need to split an I/O. (The current synchronous I/O version hides that detail from callers by looping over smaller reads if required to make all covered buffers valid in WaitReadBuffers(), so it looks like one operation but it might occasionally be several under the covers.) Aside from avoiding unnecessary pin traffic, this will also be important for later work on out-of-order streams: you can't prioritize data that is already available right now if that fact is hidden from you. The new API is natural for read_stream.c (see ed0b87c). After a short read it leaves forwarded buffers where they fell in its circular queue for the continuing call to pick up. Single-block StartReadBuffer() and traditional ReadBuffer() share code but are not affected by the change. They don't do multi-block I/O. Reviewed-by: Andres Freund <andres@anarazel.de> (earlier versions) Discussion: https://postgr.es/m/CA%2BhUKGK_%3D4CVmMHvsHjOVrK6t4F%3DLBpFzsrr3R%2BaJYN8kcTfWg%40mail.gmail.com
1 parent ed0b87c commit ce1a75c

File tree

2 files changed

+101
-47
lines changed

2 files changed

+101
-47
lines changed

src/backend/storage/buffer/bufmgr.c

Lines changed: 101 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1252,43 +1252,95 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
12521252
Buffer *buffers,
12531253
BlockNumber blockNum,
12541254
int *nblocks,
1255-
int flags)
1255+
int flags,
1256+
bool allow_forwarding)
12561257
{
12571258
int actual_nblocks = *nblocks;
1258-
int io_buffers_len = 0;
12591259
int maxcombine = 0;
12601260

1261+
Assert(*nblocks == 1 || allow_forwarding);
12611262
Assert(*nblocks > 0);
12621263
Assert(*nblocks <= MAX_IO_COMBINE_LIMIT);
12631264

12641265
for (int i = 0; i < actual_nblocks; ++i)
12651266
{
12661267
bool found;
12671268

1268-
buffers[i] = PinBufferForBlock(operation->rel,
1269-
operation->smgr,
1270-
operation->persistence,
1271-
operation->forknum,
1272-
blockNum + i,
1273-
operation->strategy,
1274-
&found);
1269+
if (allow_forwarding && buffers[i] != InvalidBuffer)
1270+
{
1271+
BufferDesc *bufHdr;
1272+
1273+
/*
1274+
* This is a buffer that was pinned by an earlier call to
1275+
* StartReadBuffers(), but couldn't be handled in one operation at
1276+
* that time. The operation was split, and the caller has passed
1277+
* an already pinned buffer back to us to handle the rest of the
1278+
* operation. It must continue at the expected block number.
1279+
*/
1280+
Assert(BufferGetBlockNumber(buffers[i]) == blockNum + i);
1281+
1282+
/*
1283+
* It might be an already valid buffer (a hit) that followed the
1284+
* final contiguous block of an earlier I/O (a miss) marking the
1285+
* end of it, or a buffer that some other backend has since made
1286+
* valid by performing the I/O for us, in which case we can handle
1287+
* it as a hit now. It is safe to check for a BM_VALID flag with
1288+
* a relaxed load, because we got a fresh view of it while pinning
1289+
* it in the previous call.
1290+
*
1291+
* On the other hand if we don't see BM_VALID yet, it must be an
1292+
* I/O that was split by the previous call and we need to try to
1293+
* start a new I/O from this block. We're also racing against any
1294+
* other backend that might start the I/O or even manage to mark
1295+
* it BM_VALID after this check, but StartBufferIO() will handle
1296+
* those cases.
1297+
*/
1298+
if (BufferIsLocal(buffers[i]))
1299+
bufHdr = GetLocalBufferDescriptor(-buffers[i] - 1);
1300+
else
1301+
bufHdr = GetBufferDescriptor(buffers[i] - 1);
1302+
Assert(pg_atomic_read_u32(&bufHdr->state) & BM_TAG_VALID);
1303+
found = pg_atomic_read_u32(&bufHdr->state) & BM_VALID;
1304+
}
1305+
else
1306+
{
1307+
buffers[i] = PinBufferForBlock(operation->rel,
1308+
operation->smgr,
1309+
operation->persistence,
1310+
operation->forknum,
1311+
blockNum + i,
1312+
operation->strategy,
1313+
&found);
1314+
}
12751315

12761316
if (found)
12771317
{
12781318
/*
1279-
* Terminate the read as soon as we get a hit. It could be a
1280-
* single buffer hit, or it could be a hit that follows a readable
1281-
* range. We don't want to create more than one readable range,
1282-
* so we stop here.
1319+
* We have a hit. If it's the first block in the requested range,
1320+
* we can return it immediately and report that WaitReadBuffers()
1321+
* does not need to be called. If the initial value of *nblocks
1322+
* was larger, the caller will have to call again for the rest.
12831323
*/
1284-
actual_nblocks = i + 1;
1324+
if (i == 0)
1325+
{
1326+
*nblocks = 1;
1327+
return false;
1328+
}
1329+
1330+
/*
1331+
* Otherwise we already have an I/O to perform, but this block
1332+
* can't be included as it is already valid. Split the I/O here.
1333+
* There may or may not be more blocks requiring I/O after this
1334+
* one, we haven't checked, but they can't be contiguous with this
1335+
* one in the way. We'll leave this buffer pinned, forwarding it
1336+
* to the next call, avoiding the need to unpin it here and re-pin
1337+
* it in the next call.
1338+
*/
1339+
actual_nblocks = i;
12851340
break;
12861341
}
12871342
else
12881343
{
1289-
/* Extend the readable range to cover this block. */
1290-
io_buffers_len++;
1291-
12921344
/*
12931345
* Check how many blocks we can cover with the same IO. The smgr
12941346
* implementation might e.g. be limited due to a segment boundary.
@@ -1309,15 +1361,11 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
13091361
}
13101362
*nblocks = actual_nblocks;
13111363

1312-
if (likely(io_buffers_len == 0))
1313-
return false;
1314-
13151364
/* Populate information needed for I/O. */
13161365
operation->buffers = buffers;
13171366
operation->blocknum = blockNum;
13181367
operation->flags = flags;
13191368
operation->nblocks = actual_nblocks;
1320-
operation->io_buffers_len = io_buffers_len;
13211369

13221370
if (flags & READ_BUFFERS_ISSUE_ADVICE)
13231371
{
@@ -1332,7 +1380,7 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
13321380
smgrprefetch(operation->smgr,
13331381
operation->forknum,
13341382
blockNum,
1335-
operation->io_buffers_len);
1383+
actual_nblocks);
13361384
}
13371385

13381386
/* Indicate that WaitReadBuffers() should be called. */
@@ -1341,16 +1389,26 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
13411389

13421390
/*
13431391
* Begin reading a range of blocks beginning at blockNum and extending for
1344-
* *nblocks. On return, up to *nblocks pinned buffers holding those blocks
1345-
* are written into the buffers array, and *nblocks is updated to contain the
1346-
* actual number, which may be fewer than requested. Caller sets some of the
1347-
* members of operation; see struct definition.
1348-
*
1349-
* If false is returned, no I/O is necessary. If true is returned, one I/O
1350-
* has been started, and WaitReadBuffers() must be called with the same
1351-
* operation object before the buffers are accessed. Along with the operation
1352-
* object, the caller-supplied array of buffers must remain valid until
1353-
* WaitReadBuffers() is called.
1392+
* *nblocks. *nblocks and the buffers array are in/out parameters. On entry,
1393+
* the buffers elements covered by *nblocks must hold either InvalidBuffer or
1394+
* buffers forwarded by an earlier call to StartReadBuffers() that was split
1395+
* and is now being continued. On return, *nblocks holds the number of blocks
1396+
* accepted by this operation. If it is less than the original number then
1397+
* this operation has been split, but buffer elements up to the original
1398+
* requested size may hold forwarded buffers to be used for a continuing
1399+
* operation. The caller must either start a new I/O beginning at the block
1400+
* immediately following the blocks accepted by this call and pass those
1401+
* buffers back in, or release them if it chooses not to. It shouldn't make
1402+
* any other use of or assumptions about forwarded buffers.
1403+
*
1404+
* If false is returned, no I/O is necessary and the buffers covered by
1405+
* *nblocks on exit are valid and ready to be accessed. If true is returned,
1406+
* an I/O has been started, and WaitReadBuffers() must be called with the same
1407+
* operation object before the buffers covered by *nblocks on exit can be
1408+
* accessed. Along with the operation object, the caller-supplied array of
1409+
* buffers must remain valid until WaitReadBuffers() is called, and any
1410+
* forwarded buffers must also be preserved for a continuing call unless
1411+
* they are explicitly released.
13541412
*
13551413
* Currently the I/O is only started with optional operating system advice if
13561414
* requested by the caller with READ_BUFFERS_ISSUE_ADVICE, and the real I/O
@@ -1364,13 +1422,17 @@ StartReadBuffers(ReadBuffersOperation *operation,
13641422
int *nblocks,
13651423
int flags)
13661424
{
1367-
return StartReadBuffersImpl(operation, buffers, blockNum, nblocks, flags);
1425+
return StartReadBuffersImpl(operation, buffers, blockNum, nblocks, flags,
1426+
true /* expect forwarded buffers */ );
13681427
}
13691428

13701429
/*
13711430
* Single block version of the StartReadBuffers(). This might save a few
13721431
* instructions when called from another translation unit, because it is
13731432
* specialized for nblocks == 1.
1433+
*
1434+
* This version does not support "forwarded" buffers: they cannot be created
1435+
* by reading only one block and *buffer is ignored on entry.
13741436
*/
13751437
bool
13761438
StartReadBuffer(ReadBuffersOperation *operation,
@@ -1381,7 +1443,8 @@ StartReadBuffer(ReadBuffersOperation *operation,
13811443
int nblocks = 1;
13821444
bool result;
13831445

1384-
result = StartReadBuffersImpl(operation, buffer, blocknum, &nblocks, flags);
1446+
result = StartReadBuffersImpl(operation, buffer, blocknum, &nblocks, flags,
1447+
false /* single block, no forwarding */ );
13851448
Assert(nblocks == 1); /* single block can't be short */
13861449

13871450
return result;
@@ -1407,24 +1470,16 @@ WaitReadBuffers(ReadBuffersOperation *operation)
14071470
IOObject io_object;
14081471
char persistence;
14091472

1410-
/*
1411-
* Currently operations are only allowed to include a read of some range,
1412-
* with an optional extra buffer that is already pinned at the end. So
1413-
* nblocks can be at most one more than io_buffers_len.
1414-
*/
1415-
Assert((operation->nblocks == operation->io_buffers_len) ||
1416-
(operation->nblocks == operation->io_buffers_len + 1));
1417-
14181473
/* Find the range of the physical read we need to perform. */
1419-
nblocks = operation->io_buffers_len;
1420-
if (nblocks == 0)
1421-
return; /* nothing to do */
1422-
1474+
nblocks = operation->nblocks;
14231475
buffers = &operation->buffers[0];
14241476
blocknum = operation->blocknum;
14251477
forknum = operation->forknum;
14261478
persistence = operation->persistence;
14271479

1480+
Assert(nblocks > 0);
1481+
Assert(nblocks <= MAX_IO_COMBINE_LIMIT);
1482+
14281483
if (persistence == RELPERSISTENCE_TEMP)
14291484
{
14301485
io_context = IOCONTEXT_NORMAL;

src/include/storage/bufmgr.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,6 @@ struct ReadBuffersOperation
130130
BlockNumber blocknum;
131131
int flags;
132132
int16 nblocks;
133-
int16 io_buffers_len;
134133
};
135134

136135
typedef struct ReadBuffersOperation ReadBuffersOperation;

0 commit comments

Comments
 (0)