10
10
* locking should be done with the full lock manager --- which depends on
11
11
* LWLocks to protect its shared state.
12
12
*
13
- * In addition to exclusive and shared modes, lightweight locks can be used
14
- * to wait until a variable changes value. The variable is initially set
15
- * when the lock is acquired with LWLockAcquireWithVar, and can be updated
13
+ * In addition to exclusive and shared modes, lightweight locks can be used to
14
+ * wait until a variable changes value. The variable is initially not set
15
+ * when the lock is acquired with LWLockAcquire, i.e. it remains set to the
16
+ * value it was set to when the lock was released last, and can be updated
16
17
* without releasing the lock by calling LWLockUpdateVar. LWLockWaitForVar
17
- * waits for the variable to be updated, or until the lock is free. The
18
- * meaning of the variable is up to the caller, the lightweight lock code
19
- * just assigns and compares it.
18
+ * waits for the variable to be updated, or until the lock is free. When
19
+ * releasing the lock with LWLockReleaseClearVar() the value can be set to an
20
+ * appropriate value for a free lock. The meaning of the variable is up to
21
+ * the caller, the lightweight lock code just assigns and compares it.
20
22
*
21
23
* Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
22
24
* Portions Copyright (c) 1994, Regents of the University of California
@@ -150,9 +152,6 @@ static LWLockHandle held_lwlocks[MAX_SIMUL_LWLOCKS];
150
152
static int lock_addin_request = 0 ;
151
153
static bool lock_addin_request_allowed = true;
152
154
153
- static inline bool LWLockAcquireCommon (LWLock * l , LWLockMode mode ,
154
- uint64 * valptr , uint64 val );
155
-
156
155
#ifdef LWLOCK_STATS
157
156
typedef struct lwlock_stats_key
158
157
{
@@ -899,25 +898,7 @@ LWLockDequeueSelf(LWLock *lock)
899
898
* Side effect: cancel/die interrupts are held off until lock release.
900
899
*/
901
900
bool
902
- LWLockAcquire (LWLock * l , LWLockMode mode )
903
- {
904
- return LWLockAcquireCommon (l , mode , NULL , 0 );
905
- }
906
-
907
- /*
908
- * LWLockAcquireWithVar - like LWLockAcquire, but also sets *valptr = val
909
- *
910
- * The lock is always acquired in exclusive mode with this function.
911
- */
912
- bool
913
- LWLockAcquireWithVar (LWLock * l , uint64 * valptr , uint64 val )
914
- {
915
- return LWLockAcquireCommon (l , LW_EXCLUSIVE , valptr , val );
916
- }
917
-
918
- /* internal function to implement LWLockAcquire and LWLockAcquireWithVar */
919
- static inline bool
920
- LWLockAcquireCommon (LWLock * lock , LWLockMode mode , uint64 * valptr , uint64 val )
901
+ LWLockAcquire (LWLock * lock , LWLockMode mode )
921
902
{
922
903
PGPROC * proc = MyProc ;
923
904
bool result = true;
@@ -1064,10 +1045,6 @@ LWLockAcquireCommon(LWLock *lock, LWLockMode mode, uint64 *valptr, uint64 val)
1064
1045
result = false;
1065
1046
}
1066
1047
1067
- /* If there's a variable associated with this lock, initialize it */
1068
- if (valptr )
1069
- * valptr = val ;
1070
-
1071
1048
TRACE_POSTGRESQL_LWLOCK_ACQUIRE (T_NAME (lock ), T_ID (lock ), mode );
1072
1049
1073
1050
/* Add lock to list of locks held by this backend */
@@ -1258,6 +1235,71 @@ LWLockAcquireOrWait(LWLock *lock, LWLockMode mode)
1258
1235
return !mustwait ;
1259
1236
}
1260
1237
1238
+ /*
1239
+ * Does the lwlock in its current state need to wait for the variable value to
1240
+ * change?
1241
+ *
1242
+ * If we don't need to wait, and it's because the value of the variable has
1243
+ * changed, store the current value in newval.
1244
+ *
1245
+ * *result is set to true if the lock was free, and false otherwise.
1246
+ */
1247
+ static bool
1248
+ LWLockConflictsWithVar (LWLock * lock ,
1249
+ uint64 * valptr , uint64 oldval , uint64 * newval ,
1250
+ bool * result )
1251
+ {
1252
+ bool mustwait ;
1253
+ uint64 value ;
1254
+ #ifdef LWLOCK_STATS
1255
+ lwlock_stats * lwstats ;
1256
+
1257
+ lwstats = get_lwlock_stats_entry (lock );
1258
+ #endif
1259
+
1260
+ /*
1261
+ * Test first to see if it the slot is free right now.
1262
+ *
1263
+ * XXX: the caller uses a spinlock before this, so we don't need a memory
1264
+ * barrier here as far as the current usage is concerned. But that might
1265
+ * not be safe in general.
1266
+ */
1267
+ mustwait = (pg_atomic_read_u32 (& lock -> state ) & LW_VAL_EXCLUSIVE ) != 0 ;
1268
+
1269
+ if (!mustwait )
1270
+ {
1271
+ * result = true;
1272
+ return false;
1273
+ }
1274
+
1275
+ * result = false;
1276
+
1277
+ /*
1278
+ * Read value using spinlock as we can't rely on atomic 64 bit
1279
+ * reads/stores. TODO: On platforms with a way to do atomic 64 bit
1280
+ * reads/writes the spinlock could be optimized away.
1281
+ */
1282
+ #ifdef LWLOCK_STATS
1283
+ lwstats -> spin_delay_count += SpinLockAcquire (& lock -> mutex );
1284
+ #else
1285
+ SpinLockAcquire (& lock -> mutex );
1286
+ #endif
1287
+ value = * valptr ;
1288
+ SpinLockRelease (& lock -> mutex );
1289
+
1290
+ if (value != oldval )
1291
+ {
1292
+ mustwait = false;
1293
+ * newval = value ;
1294
+ }
1295
+ else
1296
+ {
1297
+ mustwait = true;
1298
+ }
1299
+
1300
+ return mustwait ;
1301
+ }
1302
+
1261
1303
/*
1262
1304
* LWLockWaitForVar - Wait until lock is free, or a variable is updated.
1263
1305
*
@@ -1268,11 +1310,6 @@ LWLockAcquireOrWait(LWLock *lock, LWLockMode mode)
1268
1310
* matches oldval, returns false and sets *newval to the current value in
1269
1311
* *valptr.
1270
1312
*
1271
- * It's possible that the lock holder releases the lock, but another backend
1272
- * acquires it again before we get a chance to observe that the lock was
1273
- * momentarily released. We wouldn't need to wait for the new lock holder,
1274
- * but we cannot distinguish that case, so we will have to wait.
1275
- *
1276
1313
* Note: this function ignores shared lock holders; if the lock is held
1277
1314
* in shared mode, returns 'true'.
1278
1315
*/
@@ -1290,16 +1327,6 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
1290
1327
1291
1328
PRINT_LWDEBUG ("LWLockWaitForVar" , lock , LW_WAIT_UNTIL_FREE );
1292
1329
1293
- /*
1294
- * Quick test first to see if it the slot is free right now.
1295
- *
1296
- * XXX: the caller uses a spinlock before this, so we don't need a memory
1297
- * barrier here as far as the current usage is concerned. But that might
1298
- * not be safe in general.
1299
- */
1300
- if ((pg_atomic_read_u32 (& lock -> state ) & LW_VAL_EXCLUSIVE ) == 0 )
1301
- return true;
1302
-
1303
1330
/*
1304
1331
* Lock out cancel/die interrupts while we sleep on the lock. There is no
1305
1332
* cleanup mechanism to remove us from the wait queue if we got
@@ -1313,39 +1340,9 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
1313
1340
for (;;)
1314
1341
{
1315
1342
bool mustwait ;
1316
- uint64 value ;
1317
-
1318
- mustwait = (pg_atomic_read_u32 (& lock -> state ) & LW_VAL_EXCLUSIVE ) != 0 ;
1319
-
1320
- if (mustwait )
1321
- {
1322
- /*
1323
- * Perform comparison using spinlock as we can't rely on atomic 64
1324
- * bit reads/stores.
1325
- */
1326
- #ifdef LWLOCK_STATS
1327
- lwstats -> spin_delay_count += SpinLockAcquire (& lock -> mutex );
1328
- #else
1329
- SpinLockAcquire (& lock -> mutex );
1330
- #endif
1331
1343
1332
- /*
1333
- * XXX: We can significantly optimize this on platforms with 64bit
1334
- * atomics.
1335
- */
1336
- value = * valptr ;
1337
- if (value != oldval )
1338
- {
1339
- result = false;
1340
- mustwait = false;
1341
- * newval = value ;
1342
- }
1343
- else
1344
- mustwait = true;
1345
- SpinLockRelease (& lock -> mutex );
1346
- }
1347
- else
1348
- mustwait = false;
1344
+ mustwait = LWLockConflictsWithVar (lock , valptr , oldval , newval ,
1345
+ & result );
1349
1346
1350
1347
if (!mustwait )
1351
1348
break ; /* the lock was free or value didn't match */
@@ -1354,7 +1351,9 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
1354
1351
* Add myself to wait queue. Note that this is racy, somebody else
1355
1352
* could wakeup before we're finished queuing. NB: We're using nearly
1356
1353
* the same twice-in-a-row lock acquisition protocol as
1357
- * LWLockAcquire(). Check its comments for details.
1354
+ * LWLockAcquire(). Check its comments for details. The only
1355
+ * difference is that we also have to check the variable's values when
1356
+ * checking the state of the lock.
1358
1357
*/
1359
1358
LWLockQueueSelf (lock , LW_WAIT_UNTIL_FREE );
1360
1359
@@ -1365,12 +1364,13 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
1365
1364
pg_atomic_fetch_or_u32 (& lock -> state , LW_FLAG_RELEASE_OK );
1366
1365
1367
1366
/*
1368
- * We're now guaranteed to be woken up if necessary. Recheck the
1369
- * lock's state.
1367
+ * We're now guaranteed to be woken up if necessary. Recheck the lock
1368
+ * and variables state.
1370
1369
*/
1371
- mustwait = (pg_atomic_read_u32 (& lock -> state ) & LW_VAL_EXCLUSIVE ) != 0 ;
1370
+ mustwait = LWLockConflictsWithVar (lock , valptr , oldval , newval ,
1371
+ & result );
1372
1372
1373
- /* Ok, lock is free after we queued ourselves. Undo queueing. */
1373
+ /* Ok, no conflict after we queued ourselves. Undo queueing. */
1374
1374
if (!mustwait )
1375
1375
{
1376
1376
LOG_LWDEBUG ("LWLockWaitForVar" , lock , "free, undoing queue" );
@@ -1587,6 +1587,31 @@ LWLockRelease(LWLock *lock)
1587
1587
RESUME_INTERRUPTS ();
1588
1588
}
1589
1589
1590
+ /*
1591
+ * LWLockReleaseClearVar - release a previously acquired lock, reset variable
1592
+ */
1593
+ void
1594
+ LWLockReleaseClearVar (LWLock * lock , uint64 * valptr , uint64 val )
1595
+ {
1596
+ #ifdef LWLOCK_STATS
1597
+ lwlock_stats * lwstats ;
1598
+
1599
+ lwstats = get_lwlock_stats_entry (lock );
1600
+ lwstats -> spin_delay_count += SpinLockAcquire (& lock -> mutex );
1601
+ #else
1602
+ SpinLockAcquire (& lock -> mutex );
1603
+ #endif
1604
+ /*
1605
+ * Set the variable's value before releasing the lock, that prevents race
1606
+ * a race condition wherein a new locker acquires the lock, but hasn't yet
1607
+ * set the variables value.
1608
+ */
1609
+ * valptr = val ;
1610
+ SpinLockRelease (& lock -> mutex );
1611
+
1612
+ LWLockRelease (lock );
1613
+ }
1614
+
1590
1615
1591
1616
/*
1592
1617
* LWLockReleaseAll - release all currently-held locks
0 commit comments