@@ -302,11 +302,6 @@ static bool doPageWrites;
302
302
* so it's a plain spinlock. The other locks are held longer (potentially
303
303
* over I/O operations), so we use LWLocks for them. These locks are:
304
304
*
305
- * WALBufMappingLock: must be held to replace a page in the WAL buffer cache.
306
- * It is only held while initializing and changing the mapping. If the
307
- * contents of the buffer being replaced haven't been written yet, the mapping
308
- * lock is released while the write is done, and reacquired afterwards.
309
- *
310
305
* WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
311
306
* XLogFlush).
312
307
*
@@ -473,21 +468,37 @@ typedef struct XLogCtlData
473
468
pg_atomic_uint64 logFlushResult ; /* last byte + 1 flushed */
474
469
475
470
/*
476
- * Latest initialized page in the cache (last byte position + 1).
471
+ * First initialized page in the cache (first byte position).
472
+ */
473
+ XLogRecPtr InitializedFrom ;
474
+
475
+ /*
476
+ * Latest reserved for inititalization page in the cache (last byte
477
+ * position + 1).
477
478
*
478
- * To change the identity of a buffer (and InitializedUpTo) , you need to
479
- * hold WALBufMappingLock . To change the identity of a buffer that's
479
+ * To change the identity of a buffer, you need to advance
480
+ * InitializeReserved first . To change the identity of a buffer that's
480
481
* still dirty, the old page needs to be written out first, and for that
481
482
* you need WALWriteLock, and you need to ensure that there are no
482
483
* in-progress insertions to the page by calling
483
484
* WaitXLogInsertionsToFinish().
484
485
*/
485
- XLogRecPtr InitializedUpTo ;
486
+ pg_atomic_uint64 InitializeReserved ;
487
+
488
+ /*
489
+ * Latest initialized page in the cache (last byte position + 1).
490
+ *
491
+ * InitializedUpTo is updated after the buffer initialization. After
492
+ * update, waiters got notification using InitializedUpToCondVar.
493
+ */
494
+ pg_atomic_uint64 InitializedUpTo ;
495
+ ConditionVariable InitializedUpToCondVar ;
486
496
487
497
/*
488
498
* These values do not change after startup, although the pointed-to pages
489
- * and xlblocks values certainly do. xlblocks values are protected by
490
- * WALBufMappingLock.
499
+ * and xlblocks values certainly do. xlblocks values are changed
500
+ * lock-free according to the check for the xlog write position and are
501
+ * accompanied by changes of InitializeReserved and InitializedUpTo.
491
502
*/
492
503
char * pages ; /* buffers for unwritten XLOG pages */
493
504
pg_atomic_uint64 * xlblocks ; /* 1st byte ptr-s + XLOG_BLCKSZ */
@@ -810,9 +821,9 @@ XLogInsertRecord(XLogRecData *rdata,
810
821
* fullPageWrites from changing until the insertion is finished.
811
822
*
812
823
* Step 2 can usually be done completely in parallel. If the required WAL
813
- * page is not initialized yet, you have to grab WALBufMappingLock to
814
- * initialize it, but the WAL writer tries to do that ahead of insertions
815
- * to avoid that from happening in the critical path.
824
+ * page is not initialized yet, you have to go through AdvanceXLInsertBuffer,
825
+ * which will ensure it is initialized. But the WAL writer tries to do that
826
+ * ahead of insertions to avoid that from happening in the critical path.
816
827
*
817
828
*----------
818
829
*/
@@ -1991,32 +2002,79 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
1991
2002
XLogRecPtr NewPageEndPtr = InvalidXLogRecPtr ;
1992
2003
XLogRecPtr NewPageBeginPtr ;
1993
2004
XLogPageHeader NewPage ;
2005
+ XLogRecPtr ReservedPtr ;
1994
2006
int npages pg_attribute_unused () = 0 ;
1995
2007
1996
- LWLockAcquire (WALBufMappingLock , LW_EXCLUSIVE );
1997
-
1998
2008
/*
1999
- * Now that we have the lock, check if someone initialized the page
2000
- * already.
2009
+ * We must run the loop below inside the critical section as we expect
2010
+ * XLogCtl->InitializedUpTo to eventually keep up. The most of callers
2011
+ * already run inside the critical section. Except for WAL writer, which
2012
+ * passed 'opportunistic == true', and therefore we don't perform
2013
+ * operations that could error out.
2014
+ *
2015
+ * Start an explicit critical section anyway though.
2016
+ */
2017
+ Assert (CritSectionCount > 0 || opportunistic );
2018
+ START_CRIT_SECTION ();
2019
+
2020
+ /*--
2021
+ * Loop till we get all the pages in WAL buffer before 'upto' reserved for
2022
+ * initialization. Multiple process can initialize different buffers with
2023
+ * this loop in parallel as following.
2024
+ *
2025
+ * 1. Reserve page for initialization using XLogCtl->InitializeReserved.
2026
+ * 2. Initialize the reserved page.
2027
+ * 3. Attempt to advance XLogCtl->InitializedUpTo,
2001
2028
*/
2002
- while (upto >= XLogCtl -> InitializedUpTo || opportunistic )
2029
+ ReservedPtr = pg_atomic_read_u64 (& XLogCtl -> InitializeReserved );
2030
+ while (upto >= ReservedPtr || opportunistic )
2003
2031
{
2004
- nextidx = XLogRecPtrToBufIdx ( XLogCtl -> InitializedUpTo );
2032
+ Assert ( ReservedPtr % XLOG_BLCKSZ == 0 );
2005
2033
2006
2034
/*
2007
- * Get ending-offset of the buffer page we need to replace (this may
2008
- * be zero if the buffer hasn't been used yet). Fall through if it's
2009
- * already written out.
2035
+ * Get ending-offset of the buffer page we need to replace.
2036
+ *
2037
+ * We don't lookup into xlblocks, but rather calculate position we
2038
+ * must wait to be written. If it was written, xlblocks will have this
2039
+ * position (or uninitialized)
2010
2040
*/
2011
- OldPageRqstPtr = pg_atomic_read_u64 (& XLogCtl -> xlblocks [nextidx ]);
2012
- if (LogwrtResult .Write < OldPageRqstPtr )
2041
+ if (ReservedPtr + XLOG_BLCKSZ > XLogCtl -> InitializedFrom + XLOG_BLCKSZ * XLOGbuffers )
2042
+ OldPageRqstPtr = ReservedPtr + XLOG_BLCKSZ - (XLogRecPtr ) XLOG_BLCKSZ * XLOGbuffers ;
2043
+ else
2044
+ OldPageRqstPtr = InvalidXLogRecPtr ;
2045
+
2046
+ if (LogwrtResult .Write < OldPageRqstPtr && opportunistic )
2013
2047
{
2014
2048
/*
2015
- * Nope, got work to do. If we just want to pre-initialize as much
2016
- * as we can without flushing, give up now.
2049
+ * If we just want to pre-initialize as much as we can without
2050
+ * flushing, give up now.
2017
2051
*/
2018
- if (opportunistic )
2019
- break ;
2052
+ upto = ReservedPtr - 1 ;
2053
+ break ;
2054
+ }
2055
+
2056
+ /*
2057
+ * Attempt to reserve the page for initialization. Failure means that
2058
+ * this page got reserved by another process.
2059
+ */
2060
+ if (!pg_atomic_compare_exchange_u64 (& XLogCtl -> InitializeReserved ,
2061
+ & ReservedPtr ,
2062
+ ReservedPtr + XLOG_BLCKSZ ))
2063
+ continue ;
2064
+
2065
+ /*
2066
+ * Wait till page gets correctly initialized up to OldPageRqstPtr.
2067
+ */
2068
+ nextidx = XLogRecPtrToBufIdx (ReservedPtr );
2069
+ while (pg_atomic_read_u64 (& XLogCtl -> InitializedUpTo ) < OldPageRqstPtr )
2070
+ ConditionVariableSleep (& XLogCtl -> InitializedUpToCondVar , WAIT_EVENT_WAL_BUFFER_INIT );
2071
+ ConditionVariableCancelSleep ();
2072
+ Assert (pg_atomic_read_u64 (& XLogCtl -> xlblocks [nextidx ]) == OldPageRqstPtr );
2073
+
2074
+ /* Fall through if it's already written out. */
2075
+ if (LogwrtResult .Write < OldPageRqstPtr )
2076
+ {
2077
+ /* Nope, got work to do. */
2020
2078
2021
2079
/* Advance shared memory write request position */
2022
2080
SpinLockAcquire (& XLogCtl -> info_lck );
@@ -2031,14 +2089,6 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
2031
2089
RefreshXLogWriteResult (LogwrtResult );
2032
2090
if (LogwrtResult .Write < OldPageRqstPtr )
2033
2091
{
2034
- /*
2035
- * Must acquire write lock. Release WALBufMappingLock first,
2036
- * to make sure that all insertions that we need to wait for
2037
- * can finish (up to this same position). Otherwise we risk
2038
- * deadlock.
2039
- */
2040
- LWLockRelease (WALBufMappingLock );
2041
-
2042
2092
WaitXLogInsertionsToFinish (OldPageRqstPtr );
2043
2093
2044
2094
LWLockAcquire (WALWriteLock , LW_EXCLUSIVE );
@@ -2060,21 +2110,16 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
2060
2110
pgWalUsage .wal_buffers_full ++ ;
2061
2111
TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE ();
2062
2112
}
2063
- /* Re-acquire WALBufMappingLock and retry */
2064
- LWLockAcquire (WALBufMappingLock , LW_EXCLUSIVE );
2065
- continue ;
2066
2113
}
2067
2114
}
2068
2115
2069
2116
/*
2070
2117
* Now the next buffer slot is free and we can set it up to be the
2071
2118
* next output page.
2072
2119
*/
2073
- NewPageBeginPtr = XLogCtl -> InitializedUpTo ;
2120
+ NewPageBeginPtr = ReservedPtr ;
2074
2121
NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ ;
2075
2122
2076
- Assert (XLogRecPtrToBufIdx (NewPageBeginPtr ) == nextidx );
2077
-
2078
2123
NewPage = (XLogPageHeader ) (XLogCtl -> pages + nextidx * (Size ) XLOG_BLCKSZ );
2079
2124
2080
2125
/*
@@ -2138,12 +2183,100 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
2138
2183
*/
2139
2184
pg_write_barrier ();
2140
2185
2186
+ /*-----
2187
+ * Update the value of XLogCtl->xlblocks[nextidx] and try to advance
2188
+ * XLogCtl->InitializedUpTo in a lock-less manner.
2189
+ *
2190
+ * First, let's provide a formal proof of the algorithm. Let it be 'n'
2191
+ * process with the following variables in shared memory:
2192
+ * f - an array of 'n' boolean flags,
2193
+ * v - atomic integer variable.
2194
+ *
2195
+ * Also, let
2196
+ * i - a number of a process,
2197
+ * j - local integer variable,
2198
+ * CAS(var, oldval, newval) - compare-and-swap atomic operation
2199
+ * returning true on success,
2200
+ * write_barrier()/read_barrier() - memory barriers.
2201
+ *
2202
+ * The pseudocode for each process is the following.
2203
+ *
2204
+ * j := i
2205
+ * f[i] := true
2206
+ * write_barrier()
2207
+ * while CAS(v, j, j + 1):
2208
+ * j := j + 1
2209
+ * read_barrier()
2210
+ * if not f[j]:
2211
+ * break
2212
+ *
2213
+ * Let's prove that v eventually reaches the value of n.
2214
+ * 1. Prove by contradiction. Assume v doesn't reach n and stucks
2215
+ * on k, where k < n.
2216
+ * 2. Process k attempts CAS(v, k, k + 1). 1). If, as we assumed, v
2217
+ * gets stuck at k, then this CAS operation must fail. Therefore,
2218
+ * v < k when process k attempts CAS(v, k, k + 1).
2219
+ * 3. If, as we assumed, v gets stuck at k, then the value k of v
2220
+ * must be achieved by some process m, where m < k. The process
2221
+ * m must observe f[k] == false. Otherwise, it will later attempt
2222
+ * CAS(v, k, k + 1) with success.
2223
+ * 4. Therefore, corresponding read_barrier() (while j == k) on
2224
+ * process m happend before write_barrier() of process k. But then
2225
+ * process k attempts CAS(v, k, k + 1) after process m successfully
2226
+ * incremented v to k, and that CAS operation must succeed.
2227
+ * That leads to a contradiction. So, there is no such k (k < n)
2228
+ * where v gets stuck. Q.E.D.
2229
+ *
2230
+ * To apply this proof to the code below, we assume
2231
+ * XLogCtl->InitializedUpTo will play the role of v with XLOG_BLCKSZ
2232
+ * granularity. We also assume setting XLogCtl->xlblocks[nextidx] to
2233
+ * NewPageEndPtr to play the role of setting f[i] to true. Also, note
2234
+ * that processes can't concurrently map different xlog locations to
2235
+ * the same nextidx because we previously requested that
2236
+ * XLogCtl->InitializedUpTo >= OldPageRqstPtr. So, a xlog buffer can
2237
+ * be taken for initialization only once the previous initialization
2238
+ * takes effect on XLogCtl->InitializedUpTo.
2239
+ */
2240
+
2141
2241
pg_atomic_write_u64 (& XLogCtl -> xlblocks [nextidx ], NewPageEndPtr );
2142
- XLogCtl -> InitializedUpTo = NewPageEndPtr ;
2242
+
2243
+ pg_write_barrier ();
2244
+
2245
+ while (pg_atomic_compare_exchange_u64 (& XLogCtl -> InitializedUpTo , & NewPageBeginPtr , NewPageEndPtr ))
2246
+ {
2247
+ NewPageBeginPtr = NewPageEndPtr ;
2248
+ NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ ;
2249
+ nextidx = XLogRecPtrToBufIdx (NewPageBeginPtr );
2250
+
2251
+ pg_read_barrier ();
2252
+
2253
+ if (pg_atomic_read_u64 (& XLogCtl -> xlblocks [nextidx ]) != NewPageEndPtr )
2254
+ {
2255
+ /*
2256
+ * Page at nextidx wasn't initialized yet, so we cann't move
2257
+ * InitializedUpto further. It will be moved by backend which
2258
+ * will initialize nextidx.
2259
+ */
2260
+ ConditionVariableBroadcast (& XLogCtl -> InitializedUpToCondVar );
2261
+ break ;
2262
+ }
2263
+ }
2143
2264
2144
2265
npages ++ ;
2145
2266
}
2146
- LWLockRelease (WALBufMappingLock );
2267
+
2268
+ END_CRIT_SECTION ();
2269
+
2270
+ /*
2271
+ * All the pages in WAL buffer before 'upto' were reserved for
2272
+ * initialization. However, some pages might be reserved by concurrent
2273
+ * processes. Wait till they finish initialization.
2274
+ */
2275
+ while (upto >= pg_atomic_read_u64 (& XLogCtl -> InitializedUpTo ))
2276
+ ConditionVariableSleep (& XLogCtl -> InitializedUpToCondVar , WAIT_EVENT_WAL_BUFFER_INIT );
2277
+ ConditionVariableCancelSleep ();
2278
+
2279
+ pg_read_barrier ();
2147
2280
2148
2281
#ifdef WAL_DEBUG
2149
2282
if (XLOG_DEBUG && npages > 0 )
@@ -5071,6 +5204,10 @@ XLOGShmemInit(void)
5071
5204
pg_atomic_init_u64 (& XLogCtl -> logWriteResult , InvalidXLogRecPtr );
5072
5205
pg_atomic_init_u64 (& XLogCtl -> logFlushResult , InvalidXLogRecPtr );
5073
5206
pg_atomic_init_u64 (& XLogCtl -> unloggedLSN , InvalidXLogRecPtr );
5207
+
5208
+ pg_atomic_init_u64 (& XLogCtl -> InitializeReserved , InvalidXLogRecPtr );
5209
+ pg_atomic_init_u64 (& XLogCtl -> InitializedUpTo , InvalidXLogRecPtr );
5210
+ ConditionVariableInit (& XLogCtl -> InitializedUpToCondVar );
5074
5211
}
5075
5212
5076
5213
/*
@@ -6090,7 +6227,8 @@ StartupXLOG(void)
6090
6227
memset (page + len , 0 , XLOG_BLCKSZ - len );
6091
6228
6092
6229
pg_atomic_write_u64 (& XLogCtl -> xlblocks [firstIdx ], endOfRecoveryInfo -> lastPageBeginPtr + XLOG_BLCKSZ );
6093
- XLogCtl -> InitializedUpTo = endOfRecoveryInfo -> lastPageBeginPtr + XLOG_BLCKSZ ;
6230
+ pg_atomic_write_u64 (& XLogCtl -> InitializedUpTo , endOfRecoveryInfo -> lastPageBeginPtr + XLOG_BLCKSZ );
6231
+ XLogCtl -> InitializedFrom = endOfRecoveryInfo -> lastPageBeginPtr ;
6094
6232
}
6095
6233
else
6096
6234
{
@@ -6099,8 +6237,10 @@ StartupXLOG(void)
6099
6237
* let the first attempt to insert a log record to initialize the next
6100
6238
* buffer.
6101
6239
*/
6102
- XLogCtl -> InitializedUpTo = EndOfLog ;
6240
+ pg_atomic_write_u64 (& XLogCtl -> InitializedUpTo , EndOfLog );
6241
+ XLogCtl -> InitializedFrom = EndOfLog ;
6103
6242
}
6243
+ pg_atomic_write_u64 (& XLogCtl -> InitializeReserved , pg_atomic_read_u64 (& XLogCtl -> InitializedUpTo ));
6104
6244
6105
6245
/*
6106
6246
* Update local and shared status. This is OK to do without any locks
0 commit comments