@@ -469,6 +469,7 @@ typedef struct XLogCtlData
469
469
XLogRecPtr lastSegSwitchLSN ;
470
470
471
471
/* These are accessed using atomics -- info_lck not needed */
472
+ pg_atomic_uint64 logInsertResult ; /* last byte + 1 inserted to buffers */
472
473
pg_atomic_uint64 logWriteResult ; /* last byte + 1 written out */
473
474
pg_atomic_uint64 logFlushResult ; /* last byte + 1 flushed */
474
475
@@ -1499,6 +1500,7 @@ static XLogRecPtr
1499
1500
WaitXLogInsertionsToFinish (XLogRecPtr upto )
1500
1501
{
1501
1502
uint64 bytepos ;
1503
+ XLogRecPtr inserted ;
1502
1504
XLogRecPtr reservedUpto ;
1503
1505
XLogRecPtr finishedUpto ;
1504
1506
XLogCtlInsert * Insert = & XLogCtl -> Insert ;
@@ -1507,6 +1509,14 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
1507
1509
if (MyProc == NULL )
1508
1510
elog (PANIC , "cannot wait without a PGPROC structure" );
1509
1511
1512
+ /*
1513
+ * Check if there's any work to do. Use a barrier to ensure we get the
1514
+ * freshest value.
1515
+ */
1516
+ inserted = pg_atomic_read_membarrier_u64 (& XLogCtl -> logInsertResult );
1517
+ if (upto <= inserted )
1518
+ return inserted ;
1519
+
1510
1520
/* Read the current insert position */
1511
1521
SpinLockAcquire (& Insert -> insertpos_lck );
1512
1522
bytepos = Insert -> CurrBytePos ;
@@ -1586,6 +1596,15 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
1586
1596
if (insertingat != InvalidXLogRecPtr && insertingat < finishedUpto )
1587
1597
finishedUpto = insertingat ;
1588
1598
}
1599
+
1600
+ /*
1601
+ * Advance the limit we know to have been inserted and return the freshest
1602
+ * value we know of, which might be beyond what we requested if somebody
1603
+ * is concurrently doing this with an 'upto' pointer ahead of us.
1604
+ */
1605
+ finishedUpto = pg_atomic_monotonic_advance_u64 (& XLogCtl -> logInsertResult ,
1606
+ finishedUpto );
1607
+
1589
1608
return finishedUpto ;
1590
1609
}
1591
1610
@@ -1727,13 +1746,24 @@ WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count,
1727
1746
{
1728
1747
char * pdst = dstbuf ;
1729
1748
XLogRecPtr recptr = startptr ;
1749
+ XLogRecPtr inserted ;
1730
1750
Size nbytes = count ;
1731
1751
1732
1752
if (RecoveryInProgress () || tli != GetWALInsertionTimeLine ())
1733
1753
return 0 ;
1734
1754
1735
1755
Assert (!XLogRecPtrIsInvalid (startptr ));
1736
- Assert (startptr + count <= LogwrtResult .Write );
1756
+
1757
+ /*
1758
+ * Caller should ensure that the requested data has been inserted into WAL
1759
+ * buffers before we try to read it.
1760
+ */
1761
+ inserted = pg_atomic_read_u64 (& XLogCtl -> logInsertResult );
1762
+ if (startptr + count > inserted )
1763
+ ereport (ERROR ,
1764
+ errmsg ("cannot read past end of generated WAL: requested %X/%X, current position %X/%X" ,
1765
+ LSN_FORMAT_ARGS (startptr + count ),
1766
+ LSN_FORMAT_ARGS (inserted )));
1737
1767
1738
1768
/*
1739
1769
* Loop through the buffers without a lock. For each buffer, atomically
@@ -2571,13 +2601,19 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
2571
2601
{
2572
2602
XLogRecPtr Flush ;
2573
2603
XLogRecPtr Write ;
2604
+ XLogRecPtr Insert ;
2574
2605
2575
2606
Flush = pg_atomic_read_u64 (& XLogCtl -> logFlushResult );
2576
2607
pg_read_barrier ();
2577
2608
Write = pg_atomic_read_u64 (& XLogCtl -> logWriteResult );
2609
+ pg_read_barrier ();
2610
+ Insert = pg_atomic_read_u64 (& XLogCtl -> logInsertResult );
2578
2611
2579
2612
/* WAL written to disk is always ahead of WAL flushed */
2580
2613
Assert (Write >= Flush );
2614
+
2615
+ /* WAL inserted to buffers is always ahead of WAL written */
2616
+ Assert (Insert >= Write );
2581
2617
}
2582
2618
#endif
2583
2619
}
@@ -4951,6 +4987,7 @@ XLOGShmemInit(void)
4951
4987
4952
4988
SpinLockInit (& XLogCtl -> Insert .insertpos_lck );
4953
4989
SpinLockInit (& XLogCtl -> info_lck );
4990
+ pg_atomic_init_u64 (& XLogCtl -> logInsertResult , InvalidXLogRecPtr );
4954
4991
pg_atomic_init_u64 (& XLogCtl -> logWriteResult , InvalidXLogRecPtr );
4955
4992
pg_atomic_init_u64 (& XLogCtl -> logFlushResult , InvalidXLogRecPtr );
4956
4993
pg_atomic_init_u64 (& XLogCtl -> unloggedLSN , InvalidXLogRecPtr );
@@ -5979,6 +6016,7 @@ StartupXLOG(void)
5979
6016
* because no other process can be reading or writing WAL yet.
5980
6017
*/
5981
6018
LogwrtResult .Write = LogwrtResult .Flush = EndOfLog ;
6019
+ pg_atomic_write_u64 (& XLogCtl -> logInsertResult , EndOfLog );
5982
6020
pg_atomic_write_u64 (& XLogCtl -> logWriteResult , EndOfLog );
5983
6021
pg_atomic_write_u64 (& XLogCtl -> logFlushResult , EndOfLog );
5984
6022
XLogCtl -> LogwrtRqst .Write = EndOfLog ;
0 commit comments