29
29
#include "storage/procarray.h"
30
30
#include "storage/sinvaladt.h"
31
31
#include "storage/standby.h"
32
+ #include "utils/hsearch.h"
33
+ #include "utils/memutils.h"
32
34
#include "utils/ps_status.h"
33
35
#include "utils/timeout.h"
34
36
#include "utils/timestamp.h"
@@ -38,14 +40,22 @@ int vacuum_defer_cleanup_age;
38
40
int max_standby_archive_delay = 30 * 1000 ;
39
41
int max_standby_streaming_delay = 30 * 1000 ;
40
42
41
- static List * RecoveryLockList ;
43
+ static HTAB * RecoveryLockLists ;
42
44
43
45
static void ResolveRecoveryConflictWithVirtualXIDs (VirtualTransactionId * waitlist ,
44
46
ProcSignalReason reason );
45
47
static void SendRecoveryConflictWithBufferPin (ProcSignalReason reason );
46
48
static XLogRecPtr LogCurrentRunningXacts (RunningTransactions CurrRunningXacts );
47
49
static void LogAccessExclusiveLocks (int nlocks , xl_standby_lock * locks );
48
50
51
+ /*
52
+ * Keep track of all the locks owned by a given transaction.
53
+ */
54
+ typedef struct RecoveryLockListsEntry
55
+ {
56
+ TransactionId xid ;
57
+ List * locks ;
58
+ } RecoveryLockListsEntry ;
49
59
50
60
/*
51
61
* InitRecoveryTransactionEnvironment
63
73
InitRecoveryTransactionEnvironment (void )
64
74
{
65
75
VirtualTransactionId vxid ;
76
+ HASHCTL hash_ctl ;
77
+
78
+ /*
79
+ * Initialize the hash table for tracking the list of locks held by each
80
+ * transaction.
81
+ */
82
+ memset (& hash_ctl , 0 , sizeof (hash_ctl ));
83
+ hash_ctl .keysize = sizeof (TransactionId );
84
+ hash_ctl .entrysize = sizeof (RecoveryLockListsEntry );
85
+ RecoveryLockLists = hash_create ("RecoveryLockLists" ,
86
+ 64 ,
87
+ & hash_ctl ,
88
+ HASH_ELEM | HASH_BLOBS );
66
89
67
90
/*
68
91
* Initialize shared invalidation management for Startup process, being
@@ -107,6 +130,10 @@ ShutdownRecoveryTransactionEnvironment(void)
107
130
/* Release all locks the tracked transactions were holding */
108
131
StandbyReleaseAllLocks ();
109
132
133
+ /* Destroy the hash table of locks. */
134
+ hash_destroy (RecoveryLockLists );
135
+ RecoveryLockLists = NULL ;
136
+
110
137
/* Cleanup our VirtualTransaction */
111
138
VirtualXactLockTableCleanup ();
112
139
}
@@ -586,8 +613,8 @@ StandbyLockTimeoutHandler(void)
586
613
* We only keep track of AccessExclusiveLocks, which are only ever held by
587
614
* one transaction on one relation.
588
615
*
589
- * We keep a single dynamically expandible list of locks in local memory,
590
- * RecoveryLockList , so we can keep track of the various entries made by
616
+ * We keep a hash table of lists of locks in local memory keyed by xid ,
617
+ * RecoveryLockLists , so we can keep track of the various entries made by
591
618
* the Startup process's virtual xid in the shared lock table.
592
619
*
593
620
* List elements use type xl_standby_lock, since the WAL record type exactly
@@ -601,8 +628,10 @@ StandbyLockTimeoutHandler(void)
601
628
void
602
629
StandbyAcquireAccessExclusiveLock (TransactionId xid , Oid dbOid , Oid relOid )
603
630
{
631
+ RecoveryLockListsEntry * entry ;
604
632
xl_standby_lock * newlock ;
605
633
LOCKTAG locktag ;
634
+ bool found ;
606
635
607
636
/* Already processed? */
608
637
if (!TransactionIdIsValid (xid ) ||
@@ -616,58 +645,68 @@ StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
616
645
/* dbOid is InvalidOid when we are locking a shared relation. */
617
646
Assert (OidIsValid (relOid ));
618
647
648
+ /* Create a new list for this xid, if we don't have one already. */
649
+ entry = hash_search (RecoveryLockLists , & xid , HASH_ENTER , & found );
650
+ if (!found )
651
+ {
652
+ entry -> xid = xid ;
653
+ entry -> locks = NIL ;
654
+ }
655
+
619
656
newlock = palloc (sizeof (xl_standby_lock ));
620
657
newlock -> xid = xid ;
621
658
newlock -> dbOid = dbOid ;
622
659
newlock -> relOid = relOid ;
623
- RecoveryLockList = lappend (RecoveryLockList , newlock );
660
+ entry -> locks = lappend (entry -> locks , newlock );
624
661
625
662
SET_LOCKTAG_RELATION (locktag , newlock -> dbOid , newlock -> relOid );
626
663
627
664
LockAcquireExtended (& locktag , AccessExclusiveLock , true, false, false);
628
665
}
629
666
630
667
static void
631
- StandbyReleaseLocks ( TransactionId xid )
668
+ StandbyReleaseLockList ( List * locks )
632
669
{
633
- ListCell * cell ,
634
- * prev ,
635
- * next ;
636
-
637
- /*
638
- * Release all matching locks and remove them from list
639
- */
640
- prev = NULL ;
641
- for (cell = list_head (RecoveryLockList ); cell ; cell = next )
670
+ while (locks )
642
671
{
643
- xl_standby_lock * lock = (xl_standby_lock * ) lfirst (cell );
672
+ xl_standby_lock * lock = (xl_standby_lock * ) linitial (locks );
673
+ LOCKTAG locktag ;
674
+ elog (trace_recovery (DEBUG4 ),
675
+ "releasing recovery lock: xid %u db %u rel %u" ,
676
+ lock -> xid , lock -> dbOid , lock -> relOid );
677
+ SET_LOCKTAG_RELATION (locktag , lock -> dbOid , lock -> relOid );
678
+ if (!LockRelease (& locktag , AccessExclusiveLock , true))
679
+ {
680
+ elog (LOG ,
681
+ "RecoveryLockLists contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u" ,
682
+ lock -> xid , lock -> dbOid , lock -> relOid );
683
+ Assert (false);
684
+ }
685
+ pfree (lock );
686
+ locks = list_delete_first (locks );
687
+ }
688
+ }
644
689
645
- next = lnext (cell );
690
+ static void
691
+ StandbyReleaseLocks (TransactionId xid )
692
+ {
693
+ RecoveryLockListsEntry * entry ;
646
694
647
- if (!TransactionIdIsValid (xid ) || lock -> xid == xid )
695
+ if (TransactionIdIsValid (xid ))
696
+ {
697
+ if ((entry = hash_search (RecoveryLockLists , & xid , HASH_FIND , NULL )))
648
698
{
649
- LOCKTAG locktag ;
650
-
651
- elog (trace_recovery (DEBUG4 ),
652
- "releasing recovery lock: xid %u db %u rel %u" ,
653
- lock -> xid , lock -> dbOid , lock -> relOid );
654
- SET_LOCKTAG_RELATION (locktag , lock -> dbOid , lock -> relOid );
655
- if (!LockRelease (& locktag , AccessExclusiveLock , true))
656
- elog (LOG ,
657
- "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u" ,
658
- lock -> xid , lock -> dbOid , lock -> relOid );
659
-
660
- RecoveryLockList = list_delete_cell (RecoveryLockList , cell , prev );
661
- pfree (lock );
699
+ StandbyReleaseLockList (entry -> locks );
700
+ hash_search (RecoveryLockLists , entry , HASH_REMOVE , NULL );
662
701
}
663
- else
664
- prev = cell ;
665
702
}
703
+ else
704
+ StandbyReleaseAllLocks ();
666
705
}
667
706
668
707
/*
669
708
* Release locks for a transaction tree, starting at xid down, from
670
- * RecoveryLockList .
709
+ * RecoveryLockLists .
671
710
*
672
711
* Called during WAL replay of COMMIT/ROLLBACK when in hot standby mode,
673
712
* to remove any AccessExclusiveLocks requested by a transaction.
@@ -689,30 +728,16 @@ StandbyReleaseLockTree(TransactionId xid, int nsubxids, TransactionId *subxids)
689
728
void
690
729
StandbyReleaseAllLocks (void )
691
730
{
692
- ListCell * cell ,
693
- * prev ,
694
- * next ;
695
- LOCKTAG locktag ;
731
+ HASH_SEQ_STATUS status ;
732
+ RecoveryLockListsEntry * entry ;
696
733
697
734
elog (trace_recovery (DEBUG2 ), "release all standby locks" );
698
735
699
- prev = NULL ;
700
- for ( cell = list_head ( RecoveryLockList ); cell ; cell = next )
736
+ hash_seq_init ( & status , RecoveryLockLists ) ;
737
+ while (( entry = hash_seq_search ( & status )) )
701
738
{
702
- xl_standby_lock * lock = (xl_standby_lock * ) lfirst (cell );
703
-
704
- next = lnext (cell );
705
-
706
- elog (trace_recovery (DEBUG4 ),
707
- "releasing recovery lock: xid %u db %u rel %u" ,
708
- lock -> xid , lock -> dbOid , lock -> relOid );
709
- SET_LOCKTAG_RELATION (locktag , lock -> dbOid , lock -> relOid );
710
- if (!LockRelease (& locktag , AccessExclusiveLock , true))
711
- elog (LOG ,
712
- "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u" ,
713
- lock -> xid , lock -> dbOid , lock -> relOid );
714
- RecoveryLockList = list_delete_cell (RecoveryLockList , cell , prev );
715
- pfree (lock );
739
+ StandbyReleaseLockList (entry -> locks );
740
+ hash_search (RecoveryLockLists , entry , HASH_REMOVE , NULL );
716
741
}
717
742
}
718
743
@@ -724,22 +749,17 @@ StandbyReleaseAllLocks(void)
724
749
void
725
750
StandbyReleaseOldLocks (int nxids , TransactionId * xids )
726
751
{
727
- ListCell * cell ,
728
- * prev ,
729
- * next ;
730
- LOCKTAG locktag ;
752
+ HASH_SEQ_STATUS status ;
753
+ RecoveryLockListsEntry * entry ;
731
754
732
- prev = NULL ;
733
- for ( cell = list_head ( RecoveryLockList ); cell ; cell = next )
755
+ hash_seq_init ( & status , RecoveryLockLists ) ;
756
+ while (( entry = hash_seq_search ( & status )) )
734
757
{
735
- xl_standby_lock * lock = (xl_standby_lock * ) lfirst (cell );
736
758
bool remove = false;
737
759
738
- next = lnext ( cell );
760
+ Assert ( TransactionIdIsValid ( entry -> xid ) );
739
761
740
- Assert (TransactionIdIsValid (lock -> xid ));
741
-
742
- if (StandbyTransactionIdIsPrepared (lock -> xid ))
762
+ if (StandbyTransactionIdIsPrepared (entry -> xid ))
743
763
remove = false;
744
764
else
745
765
{
@@ -748,7 +768,7 @@ StandbyReleaseOldLocks(int nxids, TransactionId *xids)
748
768
749
769
for (i = 0 ; i < nxids ; i ++ )
750
770
{
751
- if (lock -> xid == xids [i ])
771
+ if (entry -> xid == xids [i ])
752
772
{
753
773
found = true;
754
774
break ;
@@ -764,19 +784,9 @@ StandbyReleaseOldLocks(int nxids, TransactionId *xids)
764
784
765
785
if (remove )
766
786
{
767
- elog (trace_recovery (DEBUG4 ),
768
- "releasing recovery lock: xid %u db %u rel %u" ,
769
- lock -> xid , lock -> dbOid , lock -> relOid );
770
- SET_LOCKTAG_RELATION (locktag , lock -> dbOid , lock -> relOid );
771
- if (!LockRelease (& locktag , AccessExclusiveLock , true))
772
- elog (LOG ,
773
- "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u" ,
774
- lock -> xid , lock -> dbOid , lock -> relOid );
775
- RecoveryLockList = list_delete_cell (RecoveryLockList , cell , prev );
776
- pfree (lock );
787
+ StandbyReleaseLockList (entry -> locks );
788
+ hash_search (RecoveryLockLists , entry , HASH_REMOVE , NULL );
777
789
}
778
- else
779
- prev = cell ;
780
790
}
781
791
}
782
792
0 commit comments