@@ -292,12 +292,7 @@ static bool doPageWrites;
292
292
* LogwrtRqst indicates a byte position that we need to write and/or fsync
293
293
* the log up to (all records before that point must be written or fsynced).
294
294
* The positions already written/fsynced are maintained in logWriteResult
295
- * and logFlushResult.
296
- *
297
- * To read XLogCtl->logWriteResult or ->logFlushResult, you must hold either
298
- * info_lck or WALWriteLock. To update them, you need to hold both locks.
299
- * The point of this arrangement is that the value can be examined by code
300
- * that already holds WALWriteLock without needing to grab info_lck as well.
295
+ * and logFlushResult using atomic access.
301
296
* In addition to the shared variable, each backend has a private copy of
302
297
* both in LogwrtResult, which is updated when convenient.
303
298
*
@@ -473,12 +468,9 @@ typedef struct XLogCtlData
473
468
pg_time_t lastSegSwitchTime ;
474
469
XLogRecPtr lastSegSwitchLSN ;
475
470
476
- /*
477
- * Protected by info_lck and WALWriteLock (you must hold either lock to
478
- * read it, but both to update)
479
- */
480
- XLogRecPtr logWriteResult ; /* last byte + 1 written out */
481
- XLogRecPtr logFlushResult ; /* last byte + 1 flushed */
471
+ /* These are accessed using atomics -- info_lck not needed */
472
+ pg_atomic_uint64 logWriteResult ; /* last byte + 1 written out */
473
+ pg_atomic_uint64 logFlushResult ; /* last byte + 1 flushed */
482
474
483
475
/*
484
476
* Latest initialized page in the cache (last byte position + 1).
@@ -616,11 +608,15 @@ static XLogwrtResult LogwrtResult = {0, 0};
616
608
617
609
/*
618
610
* Update local copy of shared XLogCtl->log{Write,Flush}Result
611
+ *
612
+ * It's critical that Flush always trails Write, so the order of the reads is
613
+ * important, as is the barrier. See also XLogWrite.
619
614
*/
620
615
#define RefreshXLogWriteResult (_target ) \
621
616
do { \
622
- _target.Write = XLogCtl->logWriteResult; \
623
- _target.Flush = XLogCtl->logFlushResult; \
617
+ _target.Flush = pg_atomic_read_u64(&XLogCtl->logFlushResult); \
618
+ pg_read_barrier(); \
619
+ _target.Write = pg_atomic_read_u64(&XLogCtl->logWriteResult); \
624
620
} while (0)
625
621
626
622
/*
@@ -968,9 +964,8 @@ XLogInsertRecord(XLogRecData *rdata,
968
964
/* advance global request to include new block(s) */
969
965
if (XLogCtl -> LogwrtRqst .Write < EndPos )
970
966
XLogCtl -> LogwrtRqst .Write = EndPos ;
971
- /* update local result copy while I have the chance */
972
- RefreshXLogWriteResult (LogwrtResult );
973
967
SpinLockRelease (& XLogCtl -> info_lck );
968
+ RefreshXLogWriteResult (LogwrtResult );
974
969
}
975
970
976
971
/*
@@ -1989,17 +1984,17 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
1989
1984
if (opportunistic )
1990
1985
break ;
1991
1986
1992
- /* Before waiting, get info_lck and update LogwrtResult */
1987
+ /* Advance shared memory write request position */
1993
1988
SpinLockAcquire (& XLogCtl -> info_lck );
1994
1989
if (XLogCtl -> LogwrtRqst .Write < OldPageRqstPtr )
1995
1990
XLogCtl -> LogwrtRqst .Write = OldPageRqstPtr ;
1996
- RefreshXLogWriteResult (LogwrtResult );
1997
1991
SpinLockRelease (& XLogCtl -> info_lck );
1998
1992
1999
1993
/*
2000
- * Now that we have an up-to-date LogwrtResult value, see if we
2001
- * still need to write it or if someone else already did.
1994
+ * Acquire an up-to-date LogwrtResult value and see if we still
1995
+ * need to write it or if someone else already did.
2002
1996
*/
1997
+ RefreshXLogWriteResult (LogwrtResult );
2003
1998
if (LogwrtResult .Write < OldPageRqstPtr )
2004
1999
{
2005
2000
/*
@@ -2556,16 +2551,35 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
2556
2551
* 'result' values. This is not absolutely essential, but it saves some
2557
2552
* code in a couple of places.
2558
2553
*/
2554
+ SpinLockAcquire (& XLogCtl -> info_lck );
2555
+ if (XLogCtl -> LogwrtRqst .Write < LogwrtResult .Write )
2556
+ XLogCtl -> LogwrtRqst .Write = LogwrtResult .Write ;
2557
+ if (XLogCtl -> LogwrtRqst .Flush < LogwrtResult .Flush )
2558
+ XLogCtl -> LogwrtRqst .Flush = LogwrtResult .Flush ;
2559
+ SpinLockRelease (& XLogCtl -> info_lck );
2560
+
2561
+ /*
2562
+ * We write Write first, bar, then Flush. When reading, the opposite must
2563
+ * be done (with a matching barrier in between), so that we always see a
2564
+ * Flush value that trails behind the Write value seen.
2565
+ */
2566
+ pg_atomic_write_u64 (& XLogCtl -> logWriteResult , LogwrtResult .Write );
2567
+ pg_write_barrier ();
2568
+ pg_atomic_write_u64 (& XLogCtl -> logFlushResult , LogwrtResult .Flush );
2569
+
2570
+ #ifdef USE_ASSERT_CHECKING
2559
2571
{
2560
- SpinLockAcquire (& XLogCtl -> info_lck );
2561
- XLogCtl -> logWriteResult = LogwrtResult .Write ;
2562
- XLogCtl -> logFlushResult = LogwrtResult .Flush ;
2563
- if (XLogCtl -> LogwrtRqst .Write < LogwrtResult .Write )
2564
- XLogCtl -> LogwrtRqst .Write = LogwrtResult .Write ;
2565
- if (XLogCtl -> LogwrtRqst .Flush < LogwrtResult .Flush )
2566
- XLogCtl -> LogwrtRqst .Flush = LogwrtResult .Flush ;
2567
- SpinLockRelease (& XLogCtl -> info_lck );
2572
+ XLogRecPtr Flush ;
2573
+ XLogRecPtr Write ;
2574
+
2575
+ Flush = pg_atomic_read_u64 (& XLogCtl -> logFlushResult );
2576
+ pg_read_barrier ();
2577
+ Write = pg_atomic_read_u64 (& XLogCtl -> logWriteResult );
2578
+
2579
+ /* WAL written to disk is always ahead of WAL flushed */
2580
+ Assert (Write >= Flush );
2568
2581
}
2582
+ #endif
2569
2583
}
2570
2584
2571
2585
/*
@@ -2582,7 +2596,6 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
2582
2596
XLogRecPtr prevAsyncXactLSN ;
2583
2597
2584
2598
SpinLockAcquire (& XLogCtl -> info_lck );
2585
- RefreshXLogWriteResult (LogwrtResult );
2586
2599
sleeping = XLogCtl -> WalWriterSleeping ;
2587
2600
prevAsyncXactLSN = XLogCtl -> asyncXactLSN ;
2588
2601
if (XLogCtl -> asyncXactLSN < asyncXactLSN )
@@ -2608,6 +2621,8 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
2608
2621
{
2609
2622
int flushblocks ;
2610
2623
2624
+ RefreshXLogWriteResult (LogwrtResult );
2625
+
2611
2626
flushblocks =
2612
2627
WriteRqstPtr / XLOG_BLCKSZ - LogwrtResult .Flush / XLOG_BLCKSZ ;
2613
2628
@@ -2790,21 +2805,19 @@ XLogFlush(XLogRecPtr record)
2790
2805
{
2791
2806
XLogRecPtr insertpos ;
2792
2807
2793
- /* read LogwrtResult and update local state */
2794
- SpinLockAcquire (& XLogCtl -> info_lck );
2795
- if (WriteRqstPtr < XLogCtl -> LogwrtRqst .Write )
2796
- WriteRqstPtr = XLogCtl -> LogwrtRqst .Write ;
2797
- RefreshXLogWriteResult (LogwrtResult );
2798
- SpinLockRelease (& XLogCtl -> info_lck );
2799
-
2800
2808
/* done already? */
2809
+ RefreshXLogWriteResult (LogwrtResult );
2801
2810
if (record <= LogwrtResult .Flush )
2802
2811
break ;
2803
2812
2804
2813
/*
2805
2814
* Before actually performing the write, wait for all in-flight
2806
2815
* insertions to the pages we're about to write to finish.
2807
2816
*/
2817
+ SpinLockAcquire (& XLogCtl -> info_lck );
2818
+ if (WriteRqstPtr < XLogCtl -> LogwrtRqst .Write )
2819
+ WriteRqstPtr = XLogCtl -> LogwrtRqst .Write ;
2820
+ SpinLockRelease (& XLogCtl -> info_lck );
2808
2821
insertpos = WaitXLogInsertionsToFinish (WriteRqstPtr );
2809
2822
2810
2823
/*
@@ -2947,16 +2960,16 @@ XLogBackgroundFlush(void)
2947
2960
*/
2948
2961
insertTLI = XLogCtl -> InsertTimeLineID ;
2949
2962
2950
- /* read LogwrtResult and update local state */
2963
+ /* read updated LogwrtRqst */
2951
2964
SpinLockAcquire (& XLogCtl -> info_lck );
2952
- RefreshXLogWriteResult (LogwrtResult );
2953
2965
WriteRqst = XLogCtl -> LogwrtRqst ;
2954
2966
SpinLockRelease (& XLogCtl -> info_lck );
2955
2967
2956
2968
/* back off to last completed page boundary */
2957
2969
WriteRqst .Write -= WriteRqst .Write % XLOG_BLCKSZ ;
2958
2970
2959
2971
/* if we have already flushed that far, consider async commit records */
2972
+ RefreshXLogWriteResult (LogwrtResult );
2960
2973
if (WriteRqst .Write <= LogwrtResult .Flush )
2961
2974
{
2962
2975
SpinLockAcquire (& XLogCtl -> info_lck );
@@ -3125,9 +3138,7 @@ XLogNeedsFlush(XLogRecPtr record)
3125
3138
return false;
3126
3139
3127
3140
/* read LogwrtResult and update local state */
3128
- SpinLockAcquire (& XLogCtl -> info_lck );
3129
3141
RefreshXLogWriteResult (LogwrtResult );
3130
- SpinLockRelease (& XLogCtl -> info_lck );
3131
3142
3132
3143
/* check again */
3133
3144
if (record <= LogwrtResult .Flush )
@@ -4940,6 +4951,8 @@ XLOGShmemInit(void)
4940
4951
4941
4952
SpinLockInit (& XLogCtl -> Insert .insertpos_lck );
4942
4953
SpinLockInit (& XLogCtl -> info_lck );
4954
+ pg_atomic_init_u64 (& XLogCtl -> logWriteResult , InvalidXLogRecPtr );
4955
+ pg_atomic_init_u64 (& XLogCtl -> logFlushResult , InvalidXLogRecPtr );
4943
4956
pg_atomic_init_u64 (& XLogCtl -> unloggedLSN , InvalidXLogRecPtr );
4944
4957
}
4945
4958
@@ -5961,11 +5974,13 @@ StartupXLOG(void)
5961
5974
XLogCtl -> InitializedUpTo = EndOfLog ;
5962
5975
}
5963
5976
5977
+ /*
5978
+ * Update local and shared status. This is OK to do without any locks
5979
+ * because no other process can be reading or writing WAL yet.
5980
+ */
5964
5981
LogwrtResult .Write = LogwrtResult .Flush = EndOfLog ;
5965
-
5966
- XLogCtl -> logWriteResult = LogwrtResult .Write ;
5967
- XLogCtl -> logFlushResult = LogwrtResult .Flush ;
5968
-
5982
+ pg_atomic_write_u64 (& XLogCtl -> logWriteResult , EndOfLog );
5983
+ pg_atomic_write_u64 (& XLogCtl -> logFlushResult , EndOfLog );
5969
5984
XLogCtl -> LogwrtRqst .Write = EndOfLog ;
5970
5985
XLogCtl -> LogwrtRqst .Flush = EndOfLog ;
5971
5986
@@ -6410,9 +6425,7 @@ GetFlushRecPtr(TimeLineID *insertTLI)
6410
6425
{
6411
6426
Assert (XLogCtl -> SharedRecoveryState == RECOVERY_STATE_DONE );
6412
6427
6413
- SpinLockAcquire (& XLogCtl -> info_lck );
6414
6428
RefreshXLogWriteResult (LogwrtResult );
6415
- SpinLockRelease (& XLogCtl -> info_lck );
6416
6429
6417
6430
/*
6418
6431
* If we're writing and flushing WAL, the time line can't be changing, so
@@ -9326,9 +9339,7 @@ GetXLogInsertRecPtr(void)
9326
9339
XLogRecPtr
9327
9340
GetXLogWriteRecPtr (void )
9328
9341
{
9329
- SpinLockAcquire (& XLogCtl -> info_lck );
9330
9342
RefreshXLogWriteResult (LogwrtResult );
9331
- SpinLockRelease (& XLogCtl -> info_lck );
9332
9343
9333
9344
return LogwrtResult .Write ;
9334
9345
}
0 commit comments