Skip to content

Commit f3ff7bf

Browse files
committed
Add XLogCtl->logInsertResult
This tracks the position of WAL that's been fully copied into WAL buffers by all processes emitting WAL. (For some reason we call that "WAL insertion"). This is updated using atomic monotonic advance during WaitXLogInsertionsToFinish, which is not when the insertions actually occur, but it's the only place where we know where have all the insertions have completed. This value is useful in WALReadFromBuffers, which can verify that callers don't try to read past what has been inserted. (However, more infrastructure is needed in order to actually use WAL after the flush point, since it could be lost.) The value is also useful in WaitXLogInsertionsToFinish() itself, since we can now exit quickly when all WAL has been already inserted, without even having to take any locks.
1 parent 29f6a95 commit f3ff7bf

File tree

2 files changed

+75
-1
lines changed

2 files changed

+75
-1
lines changed

src/backend/access/transam/xlog.c

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,7 @@ typedef struct XLogCtlData
469469
XLogRecPtr lastSegSwitchLSN;
470470

471471
/* These are accessed using atomics -- info_lck not needed */
472+
pg_atomic_uint64 logInsertResult; /* last byte + 1 inserted to buffers */
472473
pg_atomic_uint64 logWriteResult; /* last byte + 1 written out */
473474
pg_atomic_uint64 logFlushResult; /* last byte + 1 flushed */
474475

@@ -1499,6 +1500,7 @@ static XLogRecPtr
14991500
WaitXLogInsertionsToFinish(XLogRecPtr upto)
15001501
{
15011502
uint64 bytepos;
1503+
XLogRecPtr inserted;
15021504
XLogRecPtr reservedUpto;
15031505
XLogRecPtr finishedUpto;
15041506
XLogCtlInsert *Insert = &XLogCtl->Insert;
@@ -1507,6 +1509,14 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
15071509
if (MyProc == NULL)
15081510
elog(PANIC, "cannot wait without a PGPROC structure");
15091511

1512+
/*
1513+
* Check if there's any work to do. Use a barrier to ensure we get the
1514+
* freshest value.
1515+
*/
1516+
inserted = pg_atomic_read_membarrier_u64(&XLogCtl->logInsertResult);
1517+
if (upto <= inserted)
1518+
return inserted;
1519+
15101520
/* Read the current insert position */
15111521
SpinLockAcquire(&Insert->insertpos_lck);
15121522
bytepos = Insert->CurrBytePos;
@@ -1586,6 +1596,15 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
15861596
if (insertingat != InvalidXLogRecPtr && insertingat < finishedUpto)
15871597
finishedUpto = insertingat;
15881598
}
1599+
1600+
/*
1601+
* Advance the limit we know to have been inserted and return the freshest
1602+
* value we know of, which might be beyond what we requested if somebody
1603+
* is concurrently doing this with an 'upto' pointer ahead of us.
1604+
*/
1605+
finishedUpto = pg_atomic_monotonic_advance_u64(&XLogCtl->logInsertResult,
1606+
finishedUpto);
1607+
15891608
return finishedUpto;
15901609
}
15911610

@@ -1727,13 +1746,24 @@ WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count,
17271746
{
17281747
char *pdst = dstbuf;
17291748
XLogRecPtr recptr = startptr;
1749+
XLogRecPtr inserted;
17301750
Size nbytes = count;
17311751

17321752
if (RecoveryInProgress() || tli != GetWALInsertionTimeLine())
17331753
return 0;
17341754

17351755
Assert(!XLogRecPtrIsInvalid(startptr));
1736-
Assert(startptr + count <= LogwrtResult.Write);
1756+
1757+
/*
1758+
* Caller should ensure that the requested data has been inserted into WAL
1759+
* buffers before we try to read it.
1760+
*/
1761+
inserted = pg_atomic_read_u64(&XLogCtl->logInsertResult);
1762+
if (startptr + count > inserted)
1763+
ereport(ERROR,
1764+
errmsg("cannot read past end of generated WAL: requested %X/%X, current position %X/%X",
1765+
LSN_FORMAT_ARGS(startptr + count),
1766+
LSN_FORMAT_ARGS(inserted)));
17371767

17381768
/*
17391769
* Loop through the buffers without a lock. For each buffer, atomically
@@ -2571,13 +2601,19 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
25712601
{
25722602
XLogRecPtr Flush;
25732603
XLogRecPtr Write;
2604+
XLogRecPtr Insert;
25742605

25752606
Flush = pg_atomic_read_u64(&XLogCtl->logFlushResult);
25762607
pg_read_barrier();
25772608
Write = pg_atomic_read_u64(&XLogCtl->logWriteResult);
2609+
pg_read_barrier();
2610+
Insert = pg_atomic_read_u64(&XLogCtl->logInsertResult);
25782611

25792612
/* WAL written to disk is always ahead of WAL flushed */
25802613
Assert(Write >= Flush);
2614+
2615+
/* WAL inserted to buffers is always ahead of WAL written */
2616+
Assert(Insert >= Write);
25812617
}
25822618
#endif
25832619
}
@@ -4951,6 +4987,7 @@ XLOGShmemInit(void)
49514987

49524988
SpinLockInit(&XLogCtl->Insert.insertpos_lck);
49534989
SpinLockInit(&XLogCtl->info_lck);
4990+
pg_atomic_init_u64(&XLogCtl->logInsertResult, InvalidXLogRecPtr);
49544991
pg_atomic_init_u64(&XLogCtl->logWriteResult, InvalidXLogRecPtr);
49554992
pg_atomic_init_u64(&XLogCtl->logFlushResult, InvalidXLogRecPtr);
49564993
pg_atomic_init_u64(&XLogCtl->unloggedLSN, InvalidXLogRecPtr);
@@ -5979,6 +6016,7 @@ StartupXLOG(void)
59796016
* because no other process can be reading or writing WAL yet.
59806017
*/
59816018
LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
6019+
pg_atomic_write_u64(&XLogCtl->logInsertResult, EndOfLog);
59826020
pg_atomic_write_u64(&XLogCtl->logWriteResult, EndOfLog);
59836021
pg_atomic_write_u64(&XLogCtl->logFlushResult, EndOfLog);
59846022
XLogCtl->LogwrtRqst.Write = EndOfLog;

src/include/port/atomics.h

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -570,6 +570,42 @@ pg_atomic_sub_fetch_u64(volatile pg_atomic_uint64 *ptr, int64 sub_)
570570
return pg_atomic_sub_fetch_u64_impl(ptr, sub_);
571571
}
572572

573+
/*
574+
* Monotonically advance the given variable using only atomic operations until
575+
* it's at least the target value. Returns the latest value observed, which
576+
* may or may not be the target value.
577+
*
578+
* Full barrier semantics (even when value is unchanged).
579+
*/
580+
static inline uint64
581+
pg_atomic_monotonic_advance_u64(volatile pg_atomic_uint64 *ptr, uint64 target_)
582+
{
583+
uint64 currval;
584+
585+
#ifndef PG_HAVE_ATOMIC_U64_SIMULATION
586+
AssertPointerAlignment(ptr, 8);
587+
#endif
588+
589+
currval = pg_atomic_read_u64_impl(ptr);
590+
if (currval >= target_)
591+
{
592+
pg_memory_barrier();
593+
return currval;
594+
}
595+
596+
#ifndef PG_HAVE_ATOMIC_U64_SIMULATION
597+
AssertPointerAlignment(&currval, 8);
598+
#endif
599+
600+
while (currval < target_)
601+
{
602+
if (pg_atomic_compare_exchange_u64_impl(ptr, &currval, target_))
603+
break;
604+
}
605+
606+
return Max(target_, currval);
607+
}
608+
573609
#undef INSIDE_ATOMICS_H
574610

575611
#endif /* ATOMICS_H */

0 commit comments

Comments
 (0)