Skip to content

Commit ee1cbe8

Browse files
committed
Operate XLogCtl->log{Write,Flush}Result with atomics
This removes the need to hold both the info_lck spinlock and WALWriteLock to update them. We use stock atomic write instead, with WALWriteLock held. Readers can use atomic read, without any locking. This allows for some code to be reordered: some places were a bit contorted to avoid repeated spinlock acquisition, but that's no longer a concern, so we can turn them to more natural coding. Some further changes are possible (maybe to performance wins), but in this commit I did rather minimal ones only, to avoid increasing the blast radius. Reviewed-by: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Reviewed-by: Jeff Davis <pgsql@j-davis.com> Reviewed-by: Andres Freund <andres@anarazel.de> (earlier versions) Discussion: https://postgr.es/m/20200831182156.GA3983@alvherre.pgsql
1 parent 6f132ed commit ee1cbe8

File tree

1 file changed

+59
-48
lines changed
  • src/backend/access/transam

1 file changed

+59
-48
lines changed

src/backend/access/transam/xlog.c

Lines changed: 59 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -292,12 +292,7 @@ static bool doPageWrites;
292292
* LogwrtRqst indicates a byte position that we need to write and/or fsync
293293
* the log up to (all records before that point must be written or fsynced).
294294
* The positions already written/fsynced are maintained in logWriteResult
295-
* and logFlushResult.
296-
*
297-
* To read XLogCtl->logWriteResult or ->logFlushResult, you must hold either
298-
* info_lck or WALWriteLock. To update them, you need to hold both locks.
299-
* The point of this arrangement is that the value can be examined by code
300-
* that already holds WALWriteLock without needing to grab info_lck as well.
295+
* and logFlushResult using atomic access.
301296
* In addition to the shared variable, each backend has a private copy of
302297
* both in LogwrtResult, which is updated when convenient.
303298
*
@@ -473,12 +468,9 @@ typedef struct XLogCtlData
473468
pg_time_t lastSegSwitchTime;
474469
XLogRecPtr lastSegSwitchLSN;
475470

476-
/*
477-
* Protected by info_lck and WALWriteLock (you must hold either lock to
478-
* read it, but both to update)
479-
*/
480-
XLogRecPtr logWriteResult; /* last byte + 1 written out */
481-
XLogRecPtr logFlushResult; /* last byte + 1 flushed */
471+
/* These are accessed using atomics -- info_lck not needed */
472+
pg_atomic_uint64 logWriteResult; /* last byte + 1 written out */
473+
pg_atomic_uint64 logFlushResult; /* last byte + 1 flushed */
482474

483475
/*
484476
* Latest initialized page in the cache (last byte position + 1).
@@ -616,11 +608,15 @@ static XLogwrtResult LogwrtResult = {0, 0};
616608

617609
/*
618610
* Update local copy of shared XLogCtl->log{Write,Flush}Result
611+
*
612+
* It's critical that Flush always trails Write, so the order of the reads is
613+
* important, as is the barrier. See also XLogWrite.
619614
*/
620615
#define RefreshXLogWriteResult(_target) \
621616
do { \
622-
_target.Write = XLogCtl->logWriteResult; \
623-
_target.Flush = XLogCtl->logFlushResult; \
617+
_target.Flush = pg_atomic_read_u64(&XLogCtl->logFlushResult); \
618+
pg_read_barrier(); \
619+
_target.Write = pg_atomic_read_u64(&XLogCtl->logWriteResult); \
624620
} while (0)
625621

626622
/*
@@ -968,9 +964,8 @@ XLogInsertRecord(XLogRecData *rdata,
968964
/* advance global request to include new block(s) */
969965
if (XLogCtl->LogwrtRqst.Write < EndPos)
970966
XLogCtl->LogwrtRqst.Write = EndPos;
971-
/* update local result copy while I have the chance */
972-
RefreshXLogWriteResult(LogwrtResult);
973967
SpinLockRelease(&XLogCtl->info_lck);
968+
RefreshXLogWriteResult(LogwrtResult);
974969
}
975970

976971
/*
@@ -1989,17 +1984,17 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
19891984
if (opportunistic)
19901985
break;
19911986

1992-
/* Before waiting, get info_lck and update LogwrtResult */
1987+
/* Advance shared memory write request position */
19931988
SpinLockAcquire(&XLogCtl->info_lck);
19941989
if (XLogCtl->LogwrtRqst.Write < OldPageRqstPtr)
19951990
XLogCtl->LogwrtRqst.Write = OldPageRqstPtr;
1996-
RefreshXLogWriteResult(LogwrtResult);
19971991
SpinLockRelease(&XLogCtl->info_lck);
19981992

19991993
/*
2000-
* Now that we have an up-to-date LogwrtResult value, see if we
2001-
* still need to write it or if someone else already did.
1994+
* Acquire an up-to-date LogwrtResult value and see if we still
1995+
* need to write it or if someone else already did.
20021996
*/
1997+
RefreshXLogWriteResult(LogwrtResult);
20031998
if (LogwrtResult.Write < OldPageRqstPtr)
20041999
{
20052000
/*
@@ -2556,16 +2551,35 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
25562551
* 'result' values. This is not absolutely essential, but it saves some
25572552
* code in a couple of places.
25582553
*/
2554+
SpinLockAcquire(&XLogCtl->info_lck);
2555+
if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write)
2556+
XLogCtl->LogwrtRqst.Write = LogwrtResult.Write;
2557+
if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush)
2558+
XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush;
2559+
SpinLockRelease(&XLogCtl->info_lck);
2560+
2561+
/*
2562+
* We write Write first, bar, then Flush. When reading, the opposite must
2563+
* be done (with a matching barrier in between), so that we always see a
2564+
* Flush value that trails behind the Write value seen.
2565+
*/
2566+
pg_atomic_write_u64(&XLogCtl->logWriteResult, LogwrtResult.Write);
2567+
pg_write_barrier();
2568+
pg_atomic_write_u64(&XLogCtl->logFlushResult, LogwrtResult.Flush);
2569+
2570+
#ifdef USE_ASSERT_CHECKING
25592571
{
2560-
SpinLockAcquire(&XLogCtl->info_lck);
2561-
XLogCtl->logWriteResult = LogwrtResult.Write;
2562-
XLogCtl->logFlushResult = LogwrtResult.Flush;
2563-
if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write)
2564-
XLogCtl->LogwrtRqst.Write = LogwrtResult.Write;
2565-
if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush)
2566-
XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush;
2567-
SpinLockRelease(&XLogCtl->info_lck);
2572+
XLogRecPtr Flush;
2573+
XLogRecPtr Write;
2574+
2575+
Flush = pg_atomic_read_u64(&XLogCtl->logFlushResult);
2576+
pg_read_barrier();
2577+
Write = pg_atomic_read_u64(&XLogCtl->logWriteResult);
2578+
2579+
/* WAL written to disk is always ahead of WAL flushed */
2580+
Assert(Write >= Flush);
25682581
}
2582+
#endif
25692583
}
25702584

25712585
/*
@@ -2582,7 +2596,6 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
25822596
XLogRecPtr prevAsyncXactLSN;
25832597

25842598
SpinLockAcquire(&XLogCtl->info_lck);
2585-
RefreshXLogWriteResult(LogwrtResult);
25862599
sleeping = XLogCtl->WalWriterSleeping;
25872600
prevAsyncXactLSN = XLogCtl->asyncXactLSN;
25882601
if (XLogCtl->asyncXactLSN < asyncXactLSN)
@@ -2608,6 +2621,8 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
26082621
{
26092622
int flushblocks;
26102623

2624+
RefreshXLogWriteResult(LogwrtResult);
2625+
26112626
flushblocks =
26122627
WriteRqstPtr / XLOG_BLCKSZ - LogwrtResult.Flush / XLOG_BLCKSZ;
26132628

@@ -2790,21 +2805,19 @@ XLogFlush(XLogRecPtr record)
27902805
{
27912806
XLogRecPtr insertpos;
27922807

2793-
/* read LogwrtResult and update local state */
2794-
SpinLockAcquire(&XLogCtl->info_lck);
2795-
if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write)
2796-
WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
2797-
RefreshXLogWriteResult(LogwrtResult);
2798-
SpinLockRelease(&XLogCtl->info_lck);
2799-
28002808
/* done already? */
2809+
RefreshXLogWriteResult(LogwrtResult);
28012810
if (record <= LogwrtResult.Flush)
28022811
break;
28032812

28042813
/*
28052814
* Before actually performing the write, wait for all in-flight
28062815
* insertions to the pages we're about to write to finish.
28072816
*/
2817+
SpinLockAcquire(&XLogCtl->info_lck);
2818+
if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write)
2819+
WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
2820+
SpinLockRelease(&XLogCtl->info_lck);
28082821
insertpos = WaitXLogInsertionsToFinish(WriteRqstPtr);
28092822

28102823
/*
@@ -2947,16 +2960,16 @@ XLogBackgroundFlush(void)
29472960
*/
29482961
insertTLI = XLogCtl->InsertTimeLineID;
29492962

2950-
/* read LogwrtResult and update local state */
2963+
/* read updated LogwrtRqst */
29512964
SpinLockAcquire(&XLogCtl->info_lck);
2952-
RefreshXLogWriteResult(LogwrtResult);
29532965
WriteRqst = XLogCtl->LogwrtRqst;
29542966
SpinLockRelease(&XLogCtl->info_lck);
29552967

29562968
/* back off to last completed page boundary */
29572969
WriteRqst.Write -= WriteRqst.Write % XLOG_BLCKSZ;
29582970

29592971
/* if we have already flushed that far, consider async commit records */
2972+
RefreshXLogWriteResult(LogwrtResult);
29602973
if (WriteRqst.Write <= LogwrtResult.Flush)
29612974
{
29622975
SpinLockAcquire(&XLogCtl->info_lck);
@@ -3125,9 +3138,7 @@ XLogNeedsFlush(XLogRecPtr record)
31253138
return false;
31263139

31273140
/* read LogwrtResult and update local state */
3128-
SpinLockAcquire(&XLogCtl->info_lck);
31293141
RefreshXLogWriteResult(LogwrtResult);
3130-
SpinLockRelease(&XLogCtl->info_lck);
31313142

31323143
/* check again */
31333144
if (record <= LogwrtResult.Flush)
@@ -4940,6 +4951,8 @@ XLOGShmemInit(void)
49404951

49414952
SpinLockInit(&XLogCtl->Insert.insertpos_lck);
49424953
SpinLockInit(&XLogCtl->info_lck);
4954+
pg_atomic_init_u64(&XLogCtl->logWriteResult, InvalidXLogRecPtr);
4955+
pg_atomic_init_u64(&XLogCtl->logFlushResult, InvalidXLogRecPtr);
49434956
pg_atomic_init_u64(&XLogCtl->unloggedLSN, InvalidXLogRecPtr);
49444957
}
49454958

@@ -5961,11 +5974,13 @@ StartupXLOG(void)
59615974
XLogCtl->InitializedUpTo = EndOfLog;
59625975
}
59635976

5977+
/*
5978+
* Update local and shared status. This is OK to do without any locks
5979+
* because no other process can be reading or writing WAL yet.
5980+
*/
59645981
LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
5965-
5966-
XLogCtl->logWriteResult = LogwrtResult.Write;
5967-
XLogCtl->logFlushResult = LogwrtResult.Flush;
5968-
5982+
pg_atomic_write_u64(&XLogCtl->logWriteResult, EndOfLog);
5983+
pg_atomic_write_u64(&XLogCtl->logFlushResult, EndOfLog);
59695984
XLogCtl->LogwrtRqst.Write = EndOfLog;
59705985
XLogCtl->LogwrtRqst.Flush = EndOfLog;
59715986

@@ -6410,9 +6425,7 @@ GetFlushRecPtr(TimeLineID *insertTLI)
64106425
{
64116426
Assert(XLogCtl->SharedRecoveryState == RECOVERY_STATE_DONE);
64126427

6413-
SpinLockAcquire(&XLogCtl->info_lck);
64146428
RefreshXLogWriteResult(LogwrtResult);
6415-
SpinLockRelease(&XLogCtl->info_lck);
64166429

64176430
/*
64186431
* If we're writing and flushing WAL, the time line can't be changing, so
@@ -9326,9 +9339,7 @@ GetXLogInsertRecPtr(void)
93269339
XLogRecPtr
93279340
GetXLogWriteRecPtr(void)
93289341
{
9329-
SpinLockAcquire(&XLogCtl->info_lck);
93309342
RefreshXLogWriteResult(LogwrtResult);
9331-
SpinLockRelease(&XLogCtl->info_lck);
93329343

93339344
return LogwrtResult.Write;
93349345
}

0 commit comments

Comments
 (0)