39
39
#include "access/xloginsert.h"
40
40
#include "access/xlogutils.h"
41
41
#include "miscadmin.h"
42
+ #include "pgstat.h"
42
43
#include "pg_trace.h"
44
+ #include "storage/proc.h"
43
45
44
46
/*
45
47
* Defines for CLOG page sizes. A page is the same BLCKSZ as is used
71
73
#define GetLSNIndex (slotno , xid ) ((slotno) * CLOG_LSNS_PER_PAGE + \
72
74
((xid) % (TransactionId) CLOG_XACTS_PER_PAGE) / CLOG_XACTS_PER_LSN_GROUP)
73
75
76
+ /*
77
+ * The number of subtransactions below which we consider to apply clog group
78
+ * update optimization. Testing reveals that the number higher than this can
79
+ * hurt performance.
80
+ */
81
+ #define THRESHOLD_SUBTRANS_CLOG_OPT 5
74
82
75
83
/*
76
84
* Link to shared-memory data structures for CLOG control
@@ -87,11 +95,17 @@ static void WriteTruncateXlogRec(int pageno, TransactionId oldestXact,
87
95
Oid oldestXidDb );
88
96
static void TransactionIdSetPageStatus (TransactionId xid , int nsubxids ,
89
97
TransactionId * subxids , XidStatus status ,
90
- XLogRecPtr lsn , int pageno );
98
+ XLogRecPtr lsn , int pageno ,
99
+ bool all_xact_same_page );
91
100
static void TransactionIdSetStatusBit (TransactionId xid , XidStatus status ,
92
101
XLogRecPtr lsn , int slotno );
93
102
static void set_status_by_pages (int nsubxids , TransactionId * subxids ,
94
103
XidStatus status , XLogRecPtr lsn );
104
+ static bool TransactionGroupUpdateXidStatus (TransactionId xid ,
105
+ XidStatus status , XLogRecPtr lsn , int pageno );
106
+ static void TransactionIdSetPageStatusInternal (TransactionId xid , int nsubxids ,
107
+ TransactionId * subxids , XidStatus status ,
108
+ XLogRecPtr lsn , int pageno );
95
109
96
110
97
111
/*
@@ -174,7 +188,7 @@ TransactionIdSetTreeStatus(TransactionId xid, int nsubxids,
174
188
* Set the parent and all subtransactions in a single call
175
189
*/
176
190
TransactionIdSetPageStatus (xid , nsubxids , subxids , status , lsn ,
177
- pageno );
191
+ pageno , true );
178
192
}
179
193
else
180
194
{
@@ -201,7 +215,7 @@ TransactionIdSetTreeStatus(TransactionId xid, int nsubxids,
201
215
*/
202
216
pageno = TransactionIdToPage (xid );
203
217
TransactionIdSetPageStatus (xid , nsubxids_on_first_page , subxids , status ,
204
- lsn , pageno );
218
+ lsn , pageno , false );
205
219
206
220
/*
207
221
* Now work through the rest of the subxids one clog page at a time,
@@ -239,31 +253,100 @@ set_status_by_pages(int nsubxids, TransactionId *subxids,
239
253
240
254
TransactionIdSetPageStatus (InvalidTransactionId ,
241
255
num_on_page , subxids + offset ,
242
- status , lsn , pageno );
256
+ status , lsn , pageno , false );
243
257
offset = i ;
244
258
pageno = TransactionIdToPage (subxids [offset ]);
245
259
}
246
260
}
247
261
248
262
/*
249
- * Record the final state of transaction entries in the commit log for
250
- * all entries on a single page. Atomic only on this page.
251
- *
252
- * Otherwise API is same as TransactionIdSetTreeStatus()
263
+ * Record the final state of transaction entries in the commit log for all
264
+ * entries on a single page. Atomic only on this page.
253
265
*/
254
266
static void
255
267
TransactionIdSetPageStatus (TransactionId xid , int nsubxids ,
256
268
TransactionId * subxids , XidStatus status ,
257
- XLogRecPtr lsn , int pageno )
269
+ XLogRecPtr lsn , int pageno ,
270
+ bool all_xact_same_page )
271
+ {
272
+ /* Can't use group update when PGPROC overflows. */
273
+ StaticAssertStmt (THRESHOLD_SUBTRANS_CLOG_OPT <= PGPROC_MAX_CACHED_SUBXIDS ,
274
+ "group clog threshold less than PGPROC cached subxids" );
275
+
276
+ /*
277
+ * When there is contention on CLogControlLock, we try to group multiple
278
+ * updates; a single leader process will perform transaction status
279
+ * updates for multiple backends so that the number of times
280
+ * CLogControlLock needs to be acquired is reduced.
281
+ *
282
+ * For this optimization to be safe, the XID in MyPgXact and the subxids
283
+ * in MyProc must be the same as the ones for which we're setting the
284
+ * status. Check that this is the case.
285
+ *
286
+ * For this optimization to be efficient, we shouldn't have too many
287
+ * sub-XIDs and all of the XIDs for which we're adjusting clog should be
288
+ * on the same page. Check those conditions, too.
289
+ */
290
+ if (all_xact_same_page && xid == MyPgXact -> xid &&
291
+ nsubxids <= THRESHOLD_SUBTRANS_CLOG_OPT &&
292
+ nsubxids == MyPgXact -> nxids &&
293
+ memcmp (subxids , MyProc -> subxids .xids ,
294
+ nsubxids * sizeof (TransactionId )) == 0 )
295
+ {
296
+ /*
297
+ * We don't try to do group update optimization if a process has
298
+ * overflowed the subxids array in its PGPROC, since in that case we
299
+ * don't have a complete list of XIDs for it.
300
+ */
301
+ Assert (THRESHOLD_SUBTRANS_CLOG_OPT <= PGPROC_MAX_CACHED_SUBXIDS );
302
+
303
+ /*
304
+ * If we can immediately acquire CLogControlLock, we update the status
305
+ * of our own XID and release the lock. If not, try use group XID
306
+ * update. If that doesn't work out, fall back to waiting for the
307
+ * lock to perform an update for this transaction only.
308
+ */
309
+ if (LWLockConditionalAcquire (CLogControlLock , LW_EXCLUSIVE ))
310
+ {
311
+ /* Got the lock without waiting! Do the update. */
312
+ TransactionIdSetPageStatusInternal (xid , nsubxids , subxids , status ,
313
+ lsn , pageno );
314
+ LWLockRelease (CLogControlLock );
315
+ return ;
316
+ }
317
+ else if (TransactionGroupUpdateXidStatus (xid , status , lsn , pageno ))
318
+ {
319
+ /* Group update mechanism has done the work. */
320
+ return ;
321
+ }
322
+
323
+ /* Fall through only if update isn't done yet. */
324
+ }
325
+
326
+ /* Group update not applicable, or couldn't accept this page number. */
327
+ LWLockAcquire (CLogControlLock , LW_EXCLUSIVE );
328
+ TransactionIdSetPageStatusInternal (xid , nsubxids , subxids , status ,
329
+ lsn , pageno );
330
+ LWLockRelease (CLogControlLock );
331
+ }
332
+
333
+ /*
334
+ * Record the final state of transaction entry in the commit log
335
+ *
336
+ * We don't do any locking here; caller must handle that.
337
+ */
338
+ static void
339
+ TransactionIdSetPageStatusInternal (TransactionId xid , int nsubxids ,
340
+ TransactionId * subxids , XidStatus status ,
341
+ XLogRecPtr lsn , int pageno )
258
342
{
259
343
int slotno ;
260
344
int i ;
261
345
262
346
Assert (status == TRANSACTION_STATUS_COMMITTED ||
263
347
status == TRANSACTION_STATUS_ABORTED ||
264
348
(status == TRANSACTION_STATUS_SUB_COMMITTED && !TransactionIdIsValid (xid )));
265
-
266
- LWLockAcquire (CLogControlLock , LW_EXCLUSIVE );
349
+ Assert (LWLockHeldByMeInMode (CLogControlLock , LW_EXCLUSIVE ));
267
350
268
351
/*
269
352
* If we're doing an async commit (ie, lsn is valid), then we must wait
@@ -311,8 +394,167 @@ TransactionIdSetPageStatus(TransactionId xid, int nsubxids,
311
394
}
312
395
313
396
ClogCtl -> shared -> page_dirty [slotno ] = true;
397
+ }
398
+
399
+ /*
400
+ * When we cannot immediately acquire CLogControlLock in exclusive mode at
401
+ * commit time, add ourselves to a list of processes that need their XIDs
402
+ * status update. The first process to add itself to the list will acquire
403
+ * CLogControlLock in exclusive mode and set transaction status as required
404
+ * on behalf of all group members. This avoids a great deal of contention
405
+ * around CLogControlLock when many processes are trying to commit at once,
406
+ * since the lock need not be repeatedly handed off from one committing
407
+ * process to the next.
408
+ *
409
+ * Returns true when transaction status has been updated in clog; returns
410
+ * false if we decided against applying the optimization because the page
411
+ * number we need to update differs from those processes already waiting.
412
+ */
413
+ static bool
414
+ TransactionGroupUpdateXidStatus (TransactionId xid , XidStatus status ,
415
+ XLogRecPtr lsn , int pageno )
416
+ {
417
+ volatile PROC_HDR * procglobal = ProcGlobal ;
418
+ PGPROC * proc = MyProc ;
419
+ uint32 nextidx ;
420
+ uint32 wakeidx ;
421
+
422
+ /* We should definitely have an XID whose status needs to be updated. */
423
+ Assert (TransactionIdIsValid (xid ));
424
+
425
+ /*
426
+ * Add ourselves to the list of processes needing a group XID status
427
+ * update.
428
+ */
429
+ proc -> clogGroupMember = true;
430
+ proc -> clogGroupMemberXid = xid ;
431
+ proc -> clogGroupMemberXidStatus = status ;
432
+ proc -> clogGroupMemberPage = pageno ;
433
+ proc -> clogGroupMemberLsn = lsn ;
434
+
435
+ nextidx = pg_atomic_read_u32 (& procglobal -> clogGroupFirst );
314
436
437
+ while (true)
438
+ {
439
+ /*
440
+ * Add the proc to list, if the clog page where we need to update the
441
+ * current transaction status is same as group leader's clog page.
442
+ *
443
+ * There is a race condition here, which is that after doing the below
444
+ * check and before adding this proc's clog update to a group, the
445
+ * group leader might have already finished the group update for this
446
+ * page and becomes group leader of another group. This will lead to a
447
+ * situation where a single group can have different clog page
448
+ * updates. This isn't likely and will still work, just maybe a bit
449
+ * less efficiently.
450
+ */
451
+ if (nextidx != INVALID_PGPROCNO &&
452
+ ProcGlobal -> allProcs [nextidx ].clogGroupMemberPage != proc -> clogGroupMemberPage )
453
+ {
454
+ proc -> clogGroupMember = false;
455
+ return false;
456
+ }
457
+
458
+ pg_atomic_write_u32 (& proc -> clogGroupNext , nextidx );
459
+
460
+ if (pg_atomic_compare_exchange_u32 (& procglobal -> clogGroupFirst ,
461
+ & nextidx ,
462
+ (uint32 ) proc -> pgprocno ))
463
+ break ;
464
+ }
465
+
466
+ /*
467
+ * If the list was not empty, the leader will update the status of our
468
+ * XID. It is impossible to have followers without a leader because the
469
+ * first process that has added itself to the list will always have
470
+ * nextidx as INVALID_PGPROCNO.
471
+ */
472
+ if (nextidx != INVALID_PGPROCNO )
473
+ {
474
+ int extraWaits = 0 ;
475
+
476
+ /* Sleep until the leader updates our XID status. */
477
+ pgstat_report_wait_start (WAIT_EVENT_CLOG_GROUP_UPDATE );
478
+ for (;;)
479
+ {
480
+ /* acts as a read barrier */
481
+ PGSemaphoreLock (proc -> sem );
482
+ if (!proc -> clogGroupMember )
483
+ break ;
484
+ extraWaits ++ ;
485
+ }
486
+ pgstat_report_wait_end ();
487
+
488
+ Assert (pg_atomic_read_u32 (& proc -> clogGroupNext ) == INVALID_PGPROCNO );
489
+
490
+ /* Fix semaphore count for any absorbed wakeups */
491
+ while (extraWaits -- > 0 )
492
+ PGSemaphoreUnlock (proc -> sem );
493
+ return true;
494
+ }
495
+
496
+ /* We are the leader. Acquire the lock on behalf of everyone. */
497
+ LWLockAcquire (CLogControlLock , LW_EXCLUSIVE );
498
+
499
+ /*
500
+ * Now that we've got the lock, clear the list of processes waiting for
501
+ * group XID status update, saving a pointer to the head of the list.
502
+ * Trying to pop elements one at a time could lead to an ABA problem.
503
+ */
504
+ nextidx = pg_atomic_exchange_u32 (& procglobal -> clogGroupFirst ,
505
+ INVALID_PGPROCNO );
506
+
507
+ /* Remember head of list so we can perform wakeups after dropping lock. */
508
+ wakeidx = nextidx ;
509
+
510
+ /* Walk the list and update the status of all XIDs. */
511
+ while (nextidx != INVALID_PGPROCNO )
512
+ {
513
+ PGPROC * proc = & ProcGlobal -> allProcs [nextidx ];
514
+ PGXACT * pgxact = & ProcGlobal -> allPgXact [nextidx ];
515
+
516
+ /*
517
+ * Overflowed transactions should not use group XID status update
518
+ * mechanism.
519
+ */
520
+ Assert (!pgxact -> overflowed );
521
+
522
+ TransactionIdSetPageStatusInternal (proc -> clogGroupMemberXid ,
523
+ pgxact -> nxids ,
524
+ proc -> subxids .xids ,
525
+ proc -> clogGroupMemberXidStatus ,
526
+ proc -> clogGroupMemberLsn ,
527
+ proc -> clogGroupMemberPage );
528
+
529
+ /* Move to next proc in list. */
530
+ nextidx = pg_atomic_read_u32 (& proc -> clogGroupNext );
531
+ }
532
+
533
+ /* We're done with the lock now. */
315
534
LWLockRelease (CLogControlLock );
535
+
536
+ /*
537
+ * Now that we've released the lock, go back and wake everybody up. We
538
+ * don't do this under the lock so as to keep lock hold times to a
539
+ * minimum.
540
+ */
541
+ while (wakeidx != INVALID_PGPROCNO )
542
+ {
543
+ PGPROC * proc = & ProcGlobal -> allProcs [wakeidx ];
544
+
545
+ wakeidx = pg_atomic_read_u32 (& proc -> clogGroupNext );
546
+ pg_atomic_write_u32 (& proc -> clogGroupNext , INVALID_PGPROCNO );
547
+
548
+ /* ensure all previous writes are visible before follower continues. */
549
+ pg_write_barrier ();
550
+
551
+ proc -> clogGroupMember = false;
552
+
553
+ if (proc != MyProc )
554
+ PGSemaphoreUnlock (proc -> sem );
555
+ }
556
+
557
+ return true;
316
558
}
317
559
318
560
/*
0 commit comments