@@ -636,9 +636,11 @@ MtmBeginTransaction(MtmCurrentTrans* x)
636
636
x -> isDistributed = MtmIsUserTransaction ();
637
637
x -> isPrepared = false;
638
638
x -> isTransactionBlock = IsTransactionBlock ();
639
- /* Application name can be cahnged usnig PGAPPNAME environment variable */
639
+ /* Application name can be changed usnig PGAPPNAME environment variable */
640
640
if (!IsBackgroundWorker && x -> isDistributed && Mtm -> status != MTM_ONLINE && strcmp (application_name , MULTIMASTER_ADMIN ) != 0 ) {
641
- /* reject all user's transactions at offline cluster */
641
+ /* Reject all user's transactions at offline cluster.
642
+ * Allow execution of transaction by bg-workers to make it possible to perform recovery.
643
+ */
642
644
MtmUnlock ();
643
645
elog (ERROR , "Multimaster node is not online: current status %s" , MtmNodeStatusMnem [Mtm -> status ]);
644
646
}
@@ -674,14 +676,17 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
674
676
if (Mtm -> disabledNodeMask != 0 ) {
675
677
MtmRefreshClusterStatus (true);
676
678
if (!IsBackgroundWorker && Mtm -> status != MTM_ONLINE ) {
677
- elog (ERROR , "Abort current transaction because this cluster node is not online" );
679
+ /* Do not take in accoutn bg-workers which are performing recovery */
680
+ elog (ERROR , "Abort current transaction because this cluster node is in %s status" , MtmNodeStatusMnem [Mtm -> status ]);
678
681
}
679
682
}
680
683
681
684
MtmLock (LW_EXCLUSIVE );
682
685
683
686
/*
684
- * Check if there is global multimaster lock preventing new transaction from commit to make a chance to wal-senders to catch-up
687
+ * Check if there is global multimaster lock preventing new transaction from commit to make a chance to wal-senders to catch-up.
688
+ * Only "own" transactions are blacked. Transactions replicated from other nodes (including recovered transaction) should be proceeded
689
+ * and should not cause cluster status change.
685
690
*/
686
691
if (!x -> isReplicated ) {
687
692
MtmCheckClusterLock ();
@@ -717,7 +722,8 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
717
722
}
718
723
MtmTransactionListAppend (ts );
719
724
MtmAddSubtransactions (ts , subxids , ts -> nSubxids );
720
- MTM_TRACE ("%d: MtmPrePrepareTransaction prepare commit of %d CSN=%ld\n" , MyProcPid , x -> xid , ts -> csn );
725
+ MTM_TRACE ("%d: MtmPrePrepareTransaction prepare commit of %d (gtid.xid=%d, gtid.node=%d, CSN=%ld)\n" ,
726
+ MyProcPid , x -> xid , ts -> gtid .xid , ts -> gtid .node , ts -> csn );
721
727
MtmUnlock ();
722
728
723
729
}
@@ -843,14 +849,6 @@ void MtmSendNotificationMessage(MtmTransState* ts, MtmMessageCode cmd)
843
849
}
844
850
}
845
851
846
- void MtmRecoveryCompleted (void )
847
- {
848
- elog (WARNING , "Recovery of node %d is completed" , MtmNodeId );
849
- Mtm -> recoverySlot = 0 ;
850
- BIT_CLEAR (Mtm -> disabledNodeMask , MtmNodeId - 1 );
851
- MtmSwitchClusterMode (MTM_ONLINE );
852
- }
853
-
854
852
void MtmJoinTransaction (GlobalTransactionId * gtid , csn_t globalSnapshot )
855
853
{
856
854
MtmLock (LW_EXCLUSIVE );
@@ -934,6 +932,18 @@ csn_t MtmGetTransactionCSN(TransactionId xid)
934
932
* -------------------------------------------
935
933
*/
936
934
935
+ void MtmRecoveryCompleted (void )
936
+ {
937
+ elog (WARNING , "Recovery of node %d is completed" , MtmNodeId );
938
+ MtmLock (LW_EXCLUSIVE );
939
+ Mtm -> recoverySlot = 0 ;
940
+ BIT_CLEAR (Mtm -> disabledNodeMask , MtmNodeId - 1 );
941
+ /* Mode will be changed to online once all locagical reciever are connected */
942
+ MtmSwitchClusterMode (MTM_CONNECTED );
943
+ MtmUnlock ();
944
+ }
945
+
946
+
937
947
938
948
/**
939
949
* Check state of replication slots. If some of them are too much lag behind wal, then drop this slots to avoid
@@ -994,10 +1004,10 @@ bool MtmIsRecoveredNode(int nodeId)
994
1004
bool MtmRecoveryCaughtUp (int nodeId , XLogRecPtr slotLSN )
995
1005
{
996
1006
bool caughtUp = false;
1007
+ MtmLock (LW_EXCLUSIVE );
997
1008
if (MtmIsRecoveredNode (nodeId )) {
998
1009
XLogRecPtr walLSN = GetXLogInsertRecPtr ();
999
- MtmLock (LW_EXCLUSIVE );
1000
- if (slotLSN == walLSN ) {
1010
+ if (slotLSN == walLSN && Mtm -> nActiveTransactions == 0 ) {
1001
1011
if (BIT_CHECK (Mtm -> nodeLockerMask , nodeId - 1 )) {
1002
1012
elog (WARNING ,"Node %d is caught-up" , nodeId );
1003
1013
BIT_CLEAR (Mtm -> walSenderLockerMask , MyWalSnd - WalSndCtl -> walsnds );
@@ -1019,18 +1029,17 @@ bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
1019
1029
* We have to maintain two bitmasks: one is marking wal sender, another - correspondent nodes.
1020
1030
* Is there some better way to establish mapping between nodes ad WAL-seconder?
1021
1031
*/
1022
- elog (WARNING ,"Node %d is almost caught-up: lock cluster" , nodeId );
1032
+ elog (WARNING ,"Node %d is almost caught-up: slot position %lx, WAL position %lx, active transactions %d" ,
1033
+ nodeId , slotLSN , walLSN , Mtm -> nActiveTransactions );
1023
1034
Assert (MyWalSnd != NULL ); /* This function is called by WAL-sender, so it should not be NULL */
1024
1035
BIT_SET (Mtm -> nodeLockerMask , nodeId - 1 );
1025
1036
BIT_SET (Mtm -> walSenderLockerMask , MyWalSnd - WalSndCtl -> walsnds );
1026
1037
Mtm -> nLockers += 1 ;
1027
1038
} else {
1028
1039
MTM_INFO ("Continue recovery of node %d, slot position %lx, WAL position %lx, WAL sender position %lx, lockers %d, active transactions %d\n" , nodeId , slotLSN , walLSN , MyWalSnd -> sentPtr , Mtm -> nLockers , Mtm -> nActiveTransactions );
1029
1040
}
1030
- MtmUnlock ();
1031
- } else {
1032
- MTM_INFO ("Node %d is not in recovery mode\n" , nodeId );
1033
1041
}
1042
+ MtmUnlock ();
1034
1043
return caughtUp ;
1035
1044
}
1036
1045
@@ -1045,7 +1054,7 @@ void MtmSwitchClusterMode(MtmNodeStatus mode)
1045
1054
/*
1046
1055
* If there are recovering nodes which are catching-up WAL, check the status and prevent new transaction from commit to give
1047
1056
* WAL-sender a chance to catch-up WAL, completely synchronize replica and switch it to normal mode.
1048
- * This function is called at transaction start with multimaster lock set
1057
+ * This function is called before transaction prepare with multimaster lock set.
1049
1058
*/
1050
1059
static void
1051
1060
MtmCheckClusterLock ()
@@ -1072,8 +1081,8 @@ MtmCheckClusterLock()
1072
1081
}
1073
1082
}
1074
1083
if (mask != 0 ) {
1075
- /* some "almost catch-up" wal-senders are still working */
1076
- /* Do not start new transactions until them complete */
1084
+ /* some "almost catch-up" wal-senders are still working. */
1085
+ /* Do not start new transactions until them are completed. */
1077
1086
MtmUnlock ();
1078
1087
MtmSleep (delay );
1079
1088
if (delay * 2 <= MAX_WAIT_TIMEOUT ) {
@@ -1216,6 +1225,7 @@ void MtmOnNodeDisconnect(int nodeId)
1216
1225
void MtmOnNodeConnect (int nodeId )
1217
1226
{
1218
1227
BIT_CLEAR (Mtm -> connectivityMask , nodeId - 1 );
1228
+ elog (NOTICE , "Reconnect node %d" , nodeId );
1219
1229
RaftableSet (psprintf ("node-mask-%d" , MtmNodeId ), & Mtm -> connectivityMask , sizeof Mtm -> connectivityMask , false);
1220
1230
}
1221
1231
@@ -1646,19 +1656,23 @@ _PG_fini(void)
1646
1656
}
1647
1657
1648
1658
1649
-
1659
+ /*
1660
+ * This functions is called by pglogical receiver main function when receiver background worker is started.
1661
+ * We switch to ONLINE mode when all receviers are connected.
1662
+ * As far as background worker can be restarted multiple times, use node bitmask.
1663
+ */
1650
1664
void MtmReceiverStarted (int nodeId )
1651
1665
{
1652
- SpinLockAcquire ( & Mtm -> spinlock );
1666
+ MtmLock ( LW_EXCLUSIVE );
1653
1667
if (!BIT_CHECK (Mtm -> pglogicalNodeMask , nodeId - 1 )) {
1654
1668
BIT_SET (Mtm -> pglogicalNodeMask , nodeId - 1 );
1655
1669
if (++ Mtm -> nReceivers == Mtm -> nNodes - 1 ) {
1656
1670
if (Mtm -> status == MTM_CONNECTED ) {
1657
1671
MtmSwitchClusterMode (MTM_ONLINE );
1658
1672
}
1659
1673
}
1660
- }
1661
- SpinLockRelease ( & Mtm -> spinlock );
1674
+ }
1675
+ MtmUnlock ();
1662
1676
}
1663
1677
1664
1678
/*
0 commit comments