@@ -696,6 +696,7 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
696
696
ts -> votingCompleted = false;
697
697
ts -> cmd = MSG_INVALID ;
698
698
ts -> nSubxids = xactGetCommittedChildren (& subxids );
699
+ Mtm -> nActiveTransactions += 1 ;
699
700
700
701
x -> isPrepared = true;
701
702
x -> csn = ts -> csn ;
@@ -795,6 +796,8 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
795
796
ts -> status = TRANSACTION_STATUS_ABORTED ;
796
797
}
797
798
MtmAdjustSubtransactions (ts );
799
+ Assert (Mtm -> nActiveTransactions != 0 );
800
+ Mtm -> nActiveTransactions -= 1 ;
798
801
}
799
802
if (!commit && x -> isReplicated && TransactionIdIsValid (x -> gtid .xid )) {
800
803
/*
@@ -836,6 +839,13 @@ void MtmSendNotificationMessage(MtmTransState* ts, MtmMessageCode cmd)
836
839
}
837
840
}
838
841
842
+ void MtmRecoveryCompleted (void )
843
+ {
844
+ elog (WARNING , "Recevoery of node %d is completed" , MtmNodeId );
845
+ Mtm -> recoverySlot = 0 ;
846
+ MtmSwitchClusterMode (MTM_ONLINE );
847
+ }
848
+
839
849
void MtmJoinTransaction (GlobalTransactionId * gtid , csn_t globalSnapshot )
840
850
{
841
851
MtmLock (LW_EXCLUSIVE );
@@ -847,8 +857,7 @@ void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t globalSnapshot)
847
857
Assert (Mtm -> status == MTM_RECOVERY );
848
858
} else if (Mtm -> status == MTM_RECOVERY ) {
849
859
/* When recovery is completed we get normal transaction ID and switch to normal mode */
850
- Mtm -> recoverySlot = 0 ;
851
- MtmSwitchClusterMode (MTM_ONLINE );
860
+ MtmRecoveryCompleted ();
852
861
}
853
862
MtmTx .gtid = * gtid ;
854
863
MtmTx .xid = GetCurrentTransactionId ();
@@ -973,35 +982,52 @@ static int64 MtmGetSlotLag(int nodeId)
973
982
*/
974
983
bool MtmIsRecoveredNode (int nodeId )
975
984
{
976
- return BIT_CHECK (Mtm -> disabledNodeMask , nodeId - 1 )) ;
985
+ return BIT_CHECK (Mtm -> disabledNodeMask , nodeId - 1 );
977
986
}
978
987
979
988
980
- void MtmRecoveryPorgress ( XLogRecPtr lsn )
989
+ bool MtmRecoveryCaughtUp ( int nodeId , XLogRecPtr slotLSN )
981
990
{
982
-
983
- Assert (MyWalSnd != NULL ); /* This function is called by WAL-sender, so it should not be NULL */
984
- if (!BIT_CHECK (Mtm -> nodeLockerMask , nodeId - 1 )
985
- && MyWalSnd -> sentPtr + MtmMinRecoveryLag > GetXLogInsertRecPtr ())
991
+ bool caughtUp = false;
992
+ if (MtmIsRecoveredNode (nodeId )) {
993
+ XLogRecPtr walLSN = GetXLogInsertRecPtr ();
994
+ MtmLock (LW_EXCLUSIVE );
995
+ if (slotLSN == walLSN ) {
996
+ if (BIT_CHECK (Mtm -> nodeLockerMask , nodeId - 1 )) {
997
+ elog (WARNING ,"Node %d is caught-up" , nodeId );
998
+ BIT_CLEAR (Mtm -> disabledNodeMask , nodeId - 1 );
999
+ BIT_CLEAR (Mtm -> walSenderLockerMask , MyWalSnd - WalSndCtl -> walsnds );
1000
+ BIT_CLEAR (Mtm -> nodeLockerMask , nodeId - 1 );
1001
+ Mtm -> nLockers -= 1 ;
1002
+ } else {
1003
+ elog (WARNING ,"Node %d is caugth-up without locking cluster" , nodeId );
1004
+ /* We are lucky: caugth-up without locking cluster! */
1005
+ Mtm -> nNodes += 1 ;
1006
+ BIT_CLEAR (Mtm -> disabledNodeMask , nodeId - 1 );
1007
+ }
1008
+ caughtUp = true;
1009
+ } else if (!BIT_CHECK (Mtm -> nodeLockerMask , nodeId - 1 )
1010
+ && slotLSN + MtmMinRecoveryLag > walLSN )
986
1011
{
987
1012
/*
988
1013
* Wal sender almost catched up.
989
1014
* Lock cluster preventing new transaction to start until wal is completely replayed.
990
1015
* We have to maintain two bitmasks: one is marking wal sender, another - correspondent nodes.
991
1016
* Is there some better way to establish mapping between nodes ad WAL-seconder?
992
1017
*/
993
- elog (WARNING ,"Node %d is catching up " , nodeId );
994
- MtmLock ( LW_EXCLUSIVE );
1018
+ elog (WARNING ,"Node %d is almost caught-up: lock cluster " , nodeId );
1019
+ Assert ( MyWalSnd != NULL ); /* This function is called by WAL-sender, so it should not be NULL */
995
1020
BIT_SET (Mtm -> nodeLockerMask , nodeId - 1 );
996
1021
BIT_SET (Mtm -> walSenderLockerMask , MyWalSnd - WalSndCtl -> walsnds );
997
1022
Mtm -> nLockers += 1 ;
998
- MtmUnlock ();
999
1023
} else {
1000
- MTM_INFO ("Continue recovery of node %d, slot position %lx, WAL position %lx, lockers %d\n" , nodeId , MyWalSnd -> sentPtr , GetXLogInsertRecPtr () , Mtm -> nLockers );
1024
+ MTM_INFO ("Continue recovery of node %d, slot position %lx, WAL position %lx, WAL sender position %lx, lockers %d, active transactions %d \n" , nodeId , slotLSN , walLSN , MyWalSnd -> sentPtr , Mtm -> nLockers , Mtm -> nActiveTransactions );
1001
1025
}
1002
- return true;
1026
+ MtmUnlock ();
1027
+ } else {
1028
+ MTM_INFO ("Node %d is not in recovery mode\n" , nodeId );
1003
1029
}
1004
- return false ;
1030
+ return caughtUp ;
1005
1031
}
1006
1032
1007
1033
void MtmSwitchClusterMode (MtmNodeStatus mode )
@@ -1020,22 +1046,24 @@ void MtmSwitchClusterMode(MtmNodeStatus mode)
1020
1046
static void
1021
1047
MtmCheckClusterLock ()
1022
1048
{
1049
+ timestamp_t delay = MIN_WAIT_TIMEOUT ;
1023
1050
while (true)
1024
1051
{
1025
1052
nodemask_t mask = Mtm -> walSenderLockerMask ;
1026
1053
if (mask != 0 ) {
1027
- XLogRecPtr currLogPos = GetXLogInsertRecPtr ();
1028
- int i ;
1029
- timestamp_t delay = MIN_WAIT_TIMEOUT ;
1030
- for (i = 0 ; mask != 0 ; i ++ , mask >>= 1 ) {
1031
- if (mask & 1 ) {
1032
- if (WalSndCtl -> walsnds [i ].sentPtr != currLogPos ) {
1033
- /* recovery is in progress */
1034
- break ;
1035
- } else {
1036
- /* recovered replica catched up with master */
1037
- elog (WARNING , "WAL-sender %d complete recovery" , i );
1038
- BIT_CLEAR (Mtm -> walSenderLockerMask , i );
1054
+ if (Mtm -> nActiveTransactions == 0 ) {
1055
+ XLogRecPtr currLogPos = GetXLogInsertRecPtr ();
1056
+ int i ;
1057
+ for (i = 0 ; mask != 0 ; i ++ , mask >>= 1 ) {
1058
+ if (mask & 1 ) {
1059
+ if (WalSndCtl -> walsnds [i ].sentPtr != currLogPos ) {
1060
+ /* recovery is in progress */
1061
+ break ;
1062
+ } else {
1063
+ /* recovered replica catched up with master */
1064
+ elog (WARNING , "WAL-sender %d complete recovery" , i );
1065
+ BIT_CLEAR (Mtm -> walSenderLockerMask , i );
1066
+ }
1039
1067
}
1040
1068
}
1041
1069
}
@@ -1265,6 +1293,7 @@ static void MtmInitialize()
1265
1293
Mtm -> walSenderLockerMask = 0 ;
1266
1294
Mtm -> nodeLockerMask = 0 ;
1267
1295
Mtm -> nLockers = 0 ;
1296
+ Mtm -> nActiveTransactions = 0 ;
1268
1297
Mtm -> votingTransactions = NULL ;
1269
1298
Mtm -> transListHead = NULL ;
1270
1299
Mtm -> transListTail = & Mtm -> transListHead ;
@@ -1705,12 +1734,31 @@ void MtmDropNode(int nodeId, bool dropSlot)
1705
1734
static void
1706
1735
MtmReplicationStartupHook (struct PGLogicalStartupHookArgs * args )
1707
1736
{
1737
+ ListCell * param ;
1738
+ bool isRecoverySession = false;
1739
+ foreach (param , args -> in_params )
1740
+ {
1741
+ DefElem * elem = lfirst (param );
1742
+ if (strcmp ("mtm_replication_mode" , elem -> defname ) == 0 ) {
1743
+ isRecoverySession = elem -> arg != NULL && strVal (elem -> arg ) != NULL && strcmp (strVal (elem -> arg ), "recovery" ) == 0 ;
1744
+ break ;
1745
+ }
1746
+ }
1708
1747
MtmLock (LW_EXCLUSIVE );
1709
- if (BIT_CHECK (Mtm -> disabledNodeMask , MtmReplicationNodeId - 1 )) {
1710
- elog (WARNING , "Recovery of node %d is completed: start normal replication" , MtmReplicationNodeId );
1748
+ if (isRecoverySession ) {
1749
+ elog (WARNING , "Node %d start recovery of node %d" , MtmNodeId , MtmReplicationNodeId );
1750
+ if (!BIT_CHECK (Mtm -> disabledNodeMask , MtmReplicationNodeId - 1 )) {
1751
+ BIT_SET (Mtm -> disabledNodeMask , MtmReplicationNodeId - 1 );
1752
+ Mtm -> nNodes -= 1 ;
1753
+ MtmCheckQuorum ();
1754
+ }
1755
+ } else if (BIT_CHECK (Mtm -> disabledNodeMask , MtmReplicationNodeId - 1 )) {
1756
+ elog (WARNING , "Node %d consider that recovery of node %d is completed: start normal replication" , MtmNodeId , MtmReplicationNodeId );
1711
1757
BIT_CLEAR (Mtm -> disabledNodeMask , MtmReplicationNodeId - 1 );
1712
1758
Mtm -> nNodes += 1 ;
1713
1759
MtmCheckQuorum ();
1760
+ } else {
1761
+ elog (NOTICE , "Node %d start logical replication to node %d in normal mode" , MtmNodeId , MtmReplicationNodeId );
1714
1762
}
1715
1763
MtmUnlock ();
1716
1764
}
@@ -1728,7 +1776,7 @@ MtmReplicationTxnFilterHook(struct PGLogicalTxnFilterArgs* args)
1728
1776
bool res = Mtm -> status != MTM_RECOVERY
1729
1777
&& (args -> origin_id == InvalidRepOriginId
1730
1778
|| MtmIsRecoveredNode (MtmReplicationNodeId ));
1731
- MTM_TRACE ("%d: MtmReplicationTxnFilterHook->%d\n" , MyProcPid , res );
1779
+ MTM_INFO ("%d: MtmReplicationTxnFilterHook->%d\n" , MyProcPid , res );
1732
1780
return res ;
1733
1781
}
1734
1782
0 commit comments