@@ -167,6 +167,9 @@ static int KnownAssignedXidsGetAndSetXmin(TransactionId *xarray,
167
167
static TransactionId KnownAssignedXidsGetOldestXmin (void );
168
168
static void KnownAssignedXidsDisplay (int trace_level );
169
169
static void KnownAssignedXidsReset (void );
170
+ static inline void ProcArrayEndTransactionInternal (PGPROC * proc ,
171
+ PGXACT * pgxact , TransactionId latestXid );
172
+ static void ProcArrayGroupClearXid (PGPROC * proc , TransactionId latestXid );
170
173
171
174
/*
172
175
* Report shared-memory space needed by CreateSharedProcArray.
@@ -399,26 +402,18 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
399
402
*/
400
403
Assert (TransactionIdIsValid (allPgXact [proc -> pgprocno ].xid ));
401
404
402
- LWLockAcquire (ProcArrayLock , LW_EXCLUSIVE );
403
-
404
- pgxact -> xid = InvalidTransactionId ;
405
- proc -> lxid = InvalidLocalTransactionId ;
406
- pgxact -> xmin = InvalidTransactionId ;
407
- /* must be cleared with xid/xmin: */
408
- pgxact -> vacuumFlags &= ~PROC_VACUUM_STATE_MASK ;
409
- pgxact -> delayChkpt = false; /* be sure this is cleared in abort */
410
- proc -> recoveryConflictPending = false;
411
-
412
- /* Clear the subtransaction-XID cache too while holding the lock */
413
- pgxact -> nxids = 0 ;
414
- pgxact -> overflowed = false;
415
-
416
- /* Also advance global latestCompletedXid while holding the lock */
417
- if (TransactionIdPrecedes (ShmemVariableCache -> latestCompletedXid ,
418
- latestXid ))
419
- ShmemVariableCache -> latestCompletedXid = latestXid ;
420
-
421
- LWLockRelease (ProcArrayLock );
405
+ /*
406
+ * If we can immediately acquire ProcArrayLock, we clear our own XID
407
+ * and release the lock. If not, use group XID clearing to improve
408
+ * efficiency.
409
+ */
410
+ if (LWLockConditionalAcquire (ProcArrayLock , LW_EXCLUSIVE ))
411
+ {
412
+ ProcArrayEndTransactionInternal (proc , pgxact , latestXid );
413
+ LWLockRelease (ProcArrayLock );
414
+ }
415
+ else
416
+ ProcArrayGroupClearXid (proc , latestXid );
422
417
}
423
418
else
424
419
{
@@ -441,6 +436,137 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
441
436
}
442
437
}
443
438
439
+ /*
440
+ * Mark a write transaction as no longer running.
441
+ *
442
+ * We don't do any locking here; caller must handle that.
443
+ */
444
+ static inline void
445
+ ProcArrayEndTransactionInternal (PGPROC * proc , PGXACT * pgxact ,
446
+ TransactionId latestXid )
447
+ {
448
+ pgxact -> xid = InvalidTransactionId ;
449
+ proc -> lxid = InvalidLocalTransactionId ;
450
+ pgxact -> xmin = InvalidTransactionId ;
451
+ /* must be cleared with xid/xmin: */
452
+ pgxact -> vacuumFlags &= ~PROC_VACUUM_STATE_MASK ;
453
+ pgxact -> delayChkpt = false; /* be sure this is cleared in abort */
454
+ proc -> recoveryConflictPending = false;
455
+
456
+ /* Clear the subtransaction-XID cache too while holding the lock */
457
+ pgxact -> nxids = 0 ;
458
+ pgxact -> overflowed = false;
459
+
460
+ /* Also advance global latestCompletedXid while holding the lock */
461
+ if (TransactionIdPrecedes (ShmemVariableCache -> latestCompletedXid ,
462
+ latestXid ))
463
+ ShmemVariableCache -> latestCompletedXid = latestXid ;
464
+ }
465
+
466
+ /*
467
+ * ProcArrayGroupClearXid -- group XID clearing
468
+ *
469
+ * When we cannot immediately acquire ProcArrayLock in exclusive mode at
470
+ * commit time, add ourselves to a list of processes that need their XIDs
471
+ * cleared. The first process to add itself to the list will acquire
472
+ * ProcArrayLock in exclusive mode and perform ProcArrayEndTransactionInternal
473
+ * on behalf of all group members. This avoids a great deal of context
474
+ * switching when many processes are trying to commit at once, since the lock
475
+ * only needs to be handed from the last share-locker to one process waiting
476
+ * for the exclusive lock, rather than to each one in turn.
477
+ */
478
+ static void
479
+ ProcArrayGroupClearXid (PGPROC * proc , TransactionId latestXid )
480
+ {
481
+ volatile PROC_HDR * procglobal = ProcGlobal ;
482
+ uint32 nextidx ;
483
+ uint32 wakeidx ;
484
+ int extraWaits = -1 ;
485
+
486
+ /* We should definitely have an XID to clear. */
487
+ Assert (TransactionIdIsValid (allPgXact [proc -> pgprocno ].xid ));
488
+
489
+ /* Add ourselves to the list of processes needing a group XID clear. */
490
+ proc -> backendLatestXid = latestXid ;
491
+ while (true)
492
+ {
493
+ nextidx = pg_atomic_read_u32 (& procglobal -> nextClearXidElem );
494
+ pg_atomic_write_u32 (& proc -> nextClearXidElem , nextidx );
495
+
496
+ if (pg_atomic_compare_exchange_u32 (& procglobal -> nextClearXidElem ,
497
+ & nextidx ,
498
+ (uint32 ) proc -> pgprocno ))
499
+ break ;
500
+ }
501
+
502
+ /* If the list was not empty, the leader will clear our XID. */
503
+ if (nextidx != INVALID_PGPROCNO )
504
+ {
505
+ /* Sleep until the leader clears our XID. */
506
+ while (pg_atomic_read_u32 (& proc -> nextClearXidElem ) != INVALID_PGPROCNO )
507
+ {
508
+ extraWaits ++ ;
509
+ PGSemaphoreLock (& proc -> sem );
510
+ }
511
+
512
+ /* Fix semaphore count for any absorbed wakeups */
513
+ while (extraWaits -- > 0 )
514
+ PGSemaphoreUnlock (& proc -> sem );
515
+ return ;
516
+ }
517
+
518
+ /* We are the leader. Acquire the lock on behalf of everyone. */
519
+ LWLockAcquire (ProcArrayLock , LW_EXCLUSIVE );
520
+
521
+ /*
522
+ * Now that we've got the lock, clear the list of processes waiting for
523
+ * group XID clearing, saving a pointer to the head of the list.
524
+ */
525
+ while (true)
526
+ {
527
+ nextidx = pg_atomic_read_u32 (& procglobal -> nextClearXidElem );
528
+ if (pg_atomic_compare_exchange_u32 (& procglobal -> nextClearXidElem ,
529
+ & nextidx ,
530
+ INVALID_PGPROCNO ))
531
+ break ;
532
+ }
533
+
534
+ /* Remember head of list so we can perform wakeups after dropping lock. */
535
+ wakeidx = nextidx ;
536
+
537
+ /* Walk the list and clear all XIDs. */
538
+ while (nextidx != INVALID_PGPROCNO )
539
+ {
540
+ PGPROC * proc = & allProcs [nextidx ];
541
+ PGXACT * pgxact = & allPgXact [nextidx ];
542
+
543
+ ProcArrayEndTransactionInternal (proc , pgxact , proc -> backendLatestXid );
544
+
545
+ /* Move to next proc in list. */
546
+ nextidx = pg_atomic_read_u32 (& proc -> nextClearXidElem );
547
+ }
548
+
549
+ /* We're done with the lock now. */
550
+ LWLockRelease (ProcArrayLock );
551
+
552
+ /*
553
+ * Now that we've released the lock, go back and wake everybody up. We
554
+ * don't do this under the lock so as to keep lock hold times to a
555
+ * minimum. The system calls we need to perform to wake other processes
556
+ * up are probably much slower than the simple memory writes we did while
557
+ * holding the lock.
558
+ */
559
+ while (wakeidx != INVALID_PGPROCNO )
560
+ {
561
+ PGPROC * proc = & allProcs [wakeidx ];
562
+
563
+ wakeidx = pg_atomic_read_u32 (& proc -> nextClearXidElem );
564
+ pg_atomic_write_u32 (& proc -> nextClearXidElem , INVALID_PGPROCNO );
565
+
566
+ if (proc != MyProc )
567
+ PGSemaphoreUnlock (& proc -> sem );
568
+ }
569
+ }
444
570
445
571
/*
446
572
* ProcArrayClearTransaction -- clear the transaction fields
0 commit comments