@@ -1252,43 +1252,95 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
1252
1252
Buffer * buffers ,
1253
1253
BlockNumber blockNum ,
1254
1254
int * nblocks ,
1255
- int flags )
1255
+ int flags ,
1256
+ bool allow_forwarding )
1256
1257
{
1257
1258
int actual_nblocks = * nblocks ;
1258
- int io_buffers_len = 0 ;
1259
1259
int maxcombine = 0 ;
1260
1260
1261
+ Assert (* nblocks == 1 || allow_forwarding );
1261
1262
Assert (* nblocks > 0 );
1262
1263
Assert (* nblocks <= MAX_IO_COMBINE_LIMIT );
1263
1264
1264
1265
for (int i = 0 ; i < actual_nblocks ; ++ i )
1265
1266
{
1266
1267
bool found ;
1267
1268
1268
- buffers [i ] = PinBufferForBlock (operation -> rel ,
1269
- operation -> smgr ,
1270
- operation -> persistence ,
1271
- operation -> forknum ,
1272
- blockNum + i ,
1273
- operation -> strategy ,
1274
- & found );
1269
+ if (allow_forwarding && buffers [i ] != InvalidBuffer )
1270
+ {
1271
+ BufferDesc * bufHdr ;
1272
+
1273
+ /*
1274
+ * This is a buffer that was pinned by an earlier call to
1275
+ * StartReadBuffers(), but couldn't be handled in one operation at
1276
+ * that time. The operation was split, and the caller has passed
1277
+ * an already pinned buffer back to us to handle the rest of the
1278
+ * operation. It must continue at the expected block number.
1279
+ */
1280
+ Assert (BufferGetBlockNumber (buffers [i ]) == blockNum + i );
1281
+
1282
+ /*
1283
+ * It might be an already valid buffer (a hit) that followed the
1284
+ * final contiguous block of an earlier I/O (a miss) marking the
1285
+ * end of it, or a buffer that some other backend has since made
1286
+ * valid by performing the I/O for us, in which case we can handle
1287
+ * it as a hit now. It is safe to check for a BM_VALID flag with
1288
+ * a relaxed load, because we got a fresh view of it while pinning
1289
+ * it in the previous call.
1290
+ *
1291
+ * On the other hand if we don't see BM_VALID yet, it must be an
1292
+ * I/O that was split by the previous call and we need to try to
1293
+ * start a new I/O from this block. We're also racing against any
1294
+ * other backend that might start the I/O or even manage to mark
1295
+ * it BM_VALID after this check, but StartBufferIO() will handle
1296
+ * those cases.
1297
+ */
1298
+ if (BufferIsLocal (buffers [i ]))
1299
+ bufHdr = GetLocalBufferDescriptor (- buffers [i ] - 1 );
1300
+ else
1301
+ bufHdr = GetBufferDescriptor (buffers [i ] - 1 );
1302
+ Assert (pg_atomic_read_u32 (& bufHdr -> state ) & BM_TAG_VALID );
1303
+ found = pg_atomic_read_u32 (& bufHdr -> state ) & BM_VALID ;
1304
+ }
1305
+ else
1306
+ {
1307
+ buffers [i ] = PinBufferForBlock (operation -> rel ,
1308
+ operation -> smgr ,
1309
+ operation -> persistence ,
1310
+ operation -> forknum ,
1311
+ blockNum + i ,
1312
+ operation -> strategy ,
1313
+ & found );
1314
+ }
1275
1315
1276
1316
if (found )
1277
1317
{
1278
1318
/*
1279
- * Terminate the read as soon as we get a hit. It could be a
1280
- * single buffer hit, or it could be a hit that follows a readable
1281
- * range. We don't want to create more than one readable range,
1282
- * so we stop here .
1319
+ * We have a hit. If it's the first block in the requested range,
1320
+ * we can return it immediately and report that WaitReadBuffers()
1321
+ * does not need to be called. If the initial value of *nblocks
1322
+ * was larger, the caller will have to call again for the rest .
1283
1323
*/
1284
- actual_nblocks = i + 1 ;
1324
+ if (i == 0 )
1325
+ {
1326
+ * nblocks = 1 ;
1327
+ return false;
1328
+ }
1329
+
1330
+ /*
1331
+ * Otherwise we already have an I/O to perform, but this block
1332
+ * can't be included as it is already valid. Split the I/O here.
1333
+ * There may or may not be more blocks requiring I/O after this
1334
+ * one, we haven't checked, but they can't be contiguous with this
1335
+ * one in the way. We'll leave this buffer pinned, forwarding it
1336
+ * to the next call, avoiding the need to unpin it here and re-pin
1337
+ * it in the next call.
1338
+ */
1339
+ actual_nblocks = i ;
1285
1340
break ;
1286
1341
}
1287
1342
else
1288
1343
{
1289
- /* Extend the readable range to cover this block. */
1290
- io_buffers_len ++ ;
1291
-
1292
1344
/*
1293
1345
* Check how many blocks we can cover with the same IO. The smgr
1294
1346
* implementation might e.g. be limited due to a segment boundary.
@@ -1309,15 +1361,11 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
1309
1361
}
1310
1362
* nblocks = actual_nblocks ;
1311
1363
1312
- if (likely (io_buffers_len == 0 ))
1313
- return false;
1314
-
1315
1364
/* Populate information needed for I/O. */
1316
1365
operation -> buffers = buffers ;
1317
1366
operation -> blocknum = blockNum ;
1318
1367
operation -> flags = flags ;
1319
1368
operation -> nblocks = actual_nblocks ;
1320
- operation -> io_buffers_len = io_buffers_len ;
1321
1369
1322
1370
if (flags & READ_BUFFERS_ISSUE_ADVICE )
1323
1371
{
@@ -1332,7 +1380,7 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
1332
1380
smgrprefetch (operation -> smgr ,
1333
1381
operation -> forknum ,
1334
1382
blockNum ,
1335
- operation -> io_buffers_len );
1383
+ actual_nblocks );
1336
1384
}
1337
1385
1338
1386
/* Indicate that WaitReadBuffers() should be called. */
@@ -1341,16 +1389,26 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
1341
1389
1342
1390
/*
1343
1391
* Begin reading a range of blocks beginning at blockNum and extending for
1344
- * *nblocks. On return, up to *nblocks pinned buffers holding those blocks
1345
- * are written into the buffers array, and *nblocks is updated to contain the
1346
- * actual number, which may be fewer than requested. Caller sets some of the
1347
- * members of operation; see struct definition.
1348
- *
1349
- * If false is returned, no I/O is necessary. If true is returned, one I/O
1350
- * has been started, and WaitReadBuffers() must be called with the same
1351
- * operation object before the buffers are accessed. Along with the operation
1352
- * object, the caller-supplied array of buffers must remain valid until
1353
- * WaitReadBuffers() is called.
1392
+ * *nblocks. *nblocks and the buffers array are in/out parameters. On entry,
1393
+ * the buffers elements covered by *nblocks must hold either InvalidBuffer or
1394
+ * buffers forwarded by an earlier call to StartReadBuffers() that was split
1395
+ * and is now being continued. On return, *nblocks holds the number of blocks
1396
+ * accepted by this operation. If it is less than the original number then
1397
+ * this operation has been split, but buffer elements up to the original
1398
+ * requested size may hold forwarded buffers to be used for a continuing
1399
+ * operation. The caller must either start a new I/O beginning at the block
1400
+ * immediately following the blocks accepted by this call and pass those
1401
+ * buffers back in, or release them if it chooses not to. It shouldn't make
1402
+ * any other use of or assumptions about forwarded buffers.
1403
+ *
1404
+ * If false is returned, no I/O is necessary and the buffers covered by
1405
+ * *nblocks on exit are valid and ready to be accessed. If true is returned,
1406
+ * an I/O has been started, and WaitReadBuffers() must be called with the same
1407
+ * operation object before the buffers covered by *nblocks on exit can be
1408
+ * accessed. Along with the operation object, the caller-supplied array of
1409
+ * buffers must remain valid until WaitReadBuffers() is called, and any
1410
+ * forwarded buffers must also be preserved for a continuing call unless
1411
+ * they are explicitly released.
1354
1412
*
1355
1413
* Currently the I/O is only started with optional operating system advice if
1356
1414
* requested by the caller with READ_BUFFERS_ISSUE_ADVICE, and the real I/O
@@ -1364,13 +1422,17 @@ StartReadBuffers(ReadBuffersOperation *operation,
1364
1422
int * nblocks ,
1365
1423
int flags )
1366
1424
{
1367
- return StartReadBuffersImpl (operation , buffers , blockNum , nblocks , flags );
1425
+ return StartReadBuffersImpl (operation , buffers , blockNum , nblocks , flags ,
1426
+ true /* expect forwarded buffers */ );
1368
1427
}
1369
1428
1370
1429
/*
1371
1430
* Single block version of the StartReadBuffers(). This might save a few
1372
1431
* instructions when called from another translation unit, because it is
1373
1432
* specialized for nblocks == 1.
1433
+ *
1434
+ * This version does not support "forwarded" buffers: they cannot be created
1435
+ * by reading only one block and *buffer is ignored on entry.
1374
1436
*/
1375
1437
bool
1376
1438
StartReadBuffer (ReadBuffersOperation * operation ,
@@ -1381,7 +1443,8 @@ StartReadBuffer(ReadBuffersOperation *operation,
1381
1443
int nblocks = 1 ;
1382
1444
bool result ;
1383
1445
1384
- result = StartReadBuffersImpl (operation , buffer , blocknum , & nblocks , flags );
1446
+ result = StartReadBuffersImpl (operation , buffer , blocknum , & nblocks , flags ,
1447
+ false /* single block, no forwarding */ );
1385
1448
Assert (nblocks == 1 ); /* single block can't be short */
1386
1449
1387
1450
return result ;
@@ -1407,24 +1470,16 @@ WaitReadBuffers(ReadBuffersOperation *operation)
1407
1470
IOObject io_object ;
1408
1471
char persistence ;
1409
1472
1410
- /*
1411
- * Currently operations are only allowed to include a read of some range,
1412
- * with an optional extra buffer that is already pinned at the end. So
1413
- * nblocks can be at most one more than io_buffers_len.
1414
- */
1415
- Assert ((operation -> nblocks == operation -> io_buffers_len ) ||
1416
- (operation -> nblocks == operation -> io_buffers_len + 1 ));
1417
-
1418
1473
/* Find the range of the physical read we need to perform. */
1419
- nblocks = operation -> io_buffers_len ;
1420
- if (nblocks == 0 )
1421
- return ; /* nothing to do */
1422
-
1474
+ nblocks = operation -> nblocks ;
1423
1475
buffers = & operation -> buffers [0 ];
1424
1476
blocknum = operation -> blocknum ;
1425
1477
forknum = operation -> forknum ;
1426
1478
persistence = operation -> persistence ;
1427
1479
1480
+ Assert (nblocks > 0 );
1481
+ Assert (nblocks <= MAX_IO_COMBINE_LIMIT );
1482
+
1428
1483
if (persistence == RELPERSISTENCE_TEMP )
1429
1484
{
1430
1485
io_context = IOCONTEXT_NORMAL ;
0 commit comments