@@ -192,6 +192,8 @@ int MtmReconnectAttempts;
192
192
int MtmNodeDisableDelay ;
193
193
int MtmTransSpillThreshold ;
194
194
int MtmMaxNodes ;
195
+ int MtmHeartbeatSendTimeout ;
196
+ int MtmHeartbeatRecvTimeout ;
195
197
bool MtmUseRaftable ;
196
198
bool MtmUseDtm ;
197
199
@@ -742,6 +744,27 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
742
744
743
745
}
744
746
747
+ /*
748
+ * Check heartbeats
749
+ */
750
+ static void MtmWatchdog ()
751
+ {
752
+ int i , n = Mtm -> nAllNodes ;
753
+ timestamp_t now = MtmGetSystemTime ();
754
+ for (i = 0 ; i < n ; i ++ ) {
755
+ if (i + 1 != MtmNodeId && !BIT_CHECK (Mtm -> disabledNodeMask , i )) {
756
+ if (Mtm -> nodes [i ].lastHeartbeat != 0
757
+ && now > Mtm -> nodes [i ].lastHeartbeat + MSEC_TO_USEC (MtmHeartbeatRecvTimeout ))
758
+ {
759
+ elog (WARNING , "Disable node %d because last heartbeat was received %d msec ago" ,
760
+ i + 1 , (int )USEC_TO_MSEC (now - Mtm -> nodes [i ].lastHeartbeat ));
761
+ MtmOnNodeDisconnect (i + 1 );
762
+ }
763
+ }
764
+ }
765
+ }
766
+
767
+
745
768
static void
746
769
MtmPostPrepareTransaction (MtmCurrentTrans * x )
747
770
{
@@ -771,14 +794,24 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
771
794
MtmUnlock ();
772
795
MtmResetTransaction (x );
773
796
} else {
774
- time_t timeout = Max (Mtm2PCMinTimeout , (ts -> csn - ts -> snapshot )* Mtm2PCPrepareRatio /100000 ); /* usec->msec and percents */
797
+ time_t transTimeout = Max (Mtm2PCMinTimeout , (ts -> csn - ts -> snapshot )* Mtm2PCPrepareRatio /100000 ); /* usec->msec and percents */
798
+ time_t timeout = transTimeout < MtmHeartbeatRecvTimeout ? transTimeout : MtmHeartbeatRecvTimeout ;
799
+ timestamp_t deadline = MtmGetSystemTime () + MSEC_TO_USEC (transTimeout );
775
800
int result = 0 ;
776
801
int nConfigChanges = Mtm -> nConfigChanges ;
777
802
/* wait votes from all nodes */
778
- while (!ts -> votingCompleted && !( result & WL_TIMEOUT ) ) {
803
+ while (!ts -> votingCompleted ) {
779
804
MtmUnlock ();
805
+ MtmWatchdog ();
780
806
result = WaitLatch (& MyProc -> procLatch , WL_LATCH_SET |WL_TIMEOUT , timeout );
781
- ResetLatch (& MyProc -> procLatch );
807
+ if (result & WL_TIMEOUT ) {
808
+ if (MtmGetSystemTime () > deadline ) {
809
+ MtmLock (LW_SHARED );
810
+ break ;
811
+ }
812
+ } else {
813
+ ResetLatch (& MyProc -> procLatch );
814
+ }
782
815
MtmLock (LW_SHARED );
783
816
}
784
817
if (!ts -> votingCompleted ) {
@@ -1023,6 +1056,22 @@ void MtmHandleApplyError(void)
1023
1056
}
1024
1057
1025
1058
1059
+ static void MtmDisableNode (int nodeId )
1060
+ {
1061
+ BIT_SET (Mtm -> disabledNodeMask , nodeId - 1 );
1062
+ Mtm -> nodes [nodeId - 1 ].lastStatusChangeTime = MtmGetSystemTime ();
1063
+ Mtm -> nodes [nodeId - 1 ].lastHeartbeat = 0 ; /* defuse watchdog until first heartbeat is received */
1064
+ Mtm -> nLiveNodes -= 1 ;
1065
+ }
1066
+
1067
+ static void MtmEnableNode (int nodeId )
1068
+ {
1069
+ BIT_CLEAR (Mtm -> disabledNodeMask , nodeId - 1 );
1070
+ Mtm -> nodes [nodeId - 1 ].lastStatusChangeTime = MtmGetSystemTime ();
1071
+ Mtm -> nodes [nodeId - 1 ].lastHeartbeat = 0 ; /* defuse watchdog until first heartbeat is received */
1072
+ Mtm -> nLiveNodes += 1 ;
1073
+ }
1074
+
1026
1075
void MtmRecoveryCompleted (void )
1027
1076
{
1028
1077
MTM_LOG1 ("Recovery of node %d is completed" , MtmNodeId );
@@ -1117,9 +1166,7 @@ bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
1117
1166
MTM_LOG1 ("%d: node %d is caugth-up without locking cluster" , MyProcPid , nodeId );
1118
1167
/* We are lucky: caugth-up without locking cluster! */
1119
1168
}
1120
- BIT_CLEAR (Mtm -> disabledNodeMask , nodeId - 1 );
1121
- Mtm -> nodes [nodeId - 1 ].lastStatusChangeTime = MtmGetSystemTime ();
1122
- Mtm -> nLiveNodes += 1 ;
1169
+ MtmEnableNode (nodeId );
1123
1170
Mtm -> nConfigChanges += 1 ;
1124
1171
caughtUp = true;
1125
1172
} else if (!BIT_CHECK (Mtm -> nodeLockerMask , nodeId - 1 )
@@ -1262,17 +1309,13 @@ bool MtmRefreshClusterStatus(bool nowait)
1262
1309
mask = ~clique & (((nodemask_t )1 << Mtm -> nAllNodes )- 1 ) & ~Mtm -> disabledNodeMask ; /* new disabled nodes mask */
1263
1310
for (i = 0 ; mask != 0 ; i ++ , mask >>= 1 ) {
1264
1311
if (mask & 1 ) {
1265
- Mtm -> nLiveNodes -= 1 ;
1266
- BIT_SET (Mtm -> disabledNodeMask , i );
1267
- Mtm -> nodes [i ].lastStatusChangeTime = MtmGetSystemTime ();
1312
+ MtmDisableNode (i + 1 );
1268
1313
}
1269
1314
}
1270
1315
mask = clique & Mtm -> disabledNodeMask ; /* new enabled nodes mask */
1271
1316
for (i = 0 ; mask != 0 ; i ++ , mask >>= 1 ) {
1272
1317
if (mask & 1 ) {
1273
- Mtm -> nLiveNodes += 1 ;
1274
- BIT_CLEAR (Mtm -> disabledNodeMask , i );
1275
- Mtm -> nodes [i ].lastStatusChangeTime = MtmGetSystemTime ();
1318
+ MtmEnableNode (i + 1 );
1276
1319
}
1277
1320
}
1278
1321
MtmCheckQuorum ();
@@ -1317,7 +1360,6 @@ void MtmOnNodeDisconnect(int nodeId)
1317
1360
/* Avoid false detection of node failure and prevent node status blinking */
1318
1361
return ;
1319
1362
}
1320
-
1321
1363
BIT_SET (Mtm -> connectivityMask , nodeId - 1 );
1322
1364
BIT_SET (Mtm -> reconnectMask , nodeId - 1 );
1323
1365
RaftableSet (psprintf ("node-mask-%d" , MtmNodeId ), & Mtm -> connectivityMask , sizeof Mtm -> connectivityMask , false);
@@ -1328,9 +1370,7 @@ void MtmOnNodeDisconnect(int nodeId)
1328
1370
if (!MtmRefreshClusterStatus (false)) {
1329
1371
MtmLock (LW_EXCLUSIVE );
1330
1372
if (!BIT_CHECK (Mtm -> disabledNodeMask , nodeId - 1 )) {
1331
- Mtm -> nodes [nodeId - 1 ].lastStatusChangeTime = MtmGetSystemTime ();
1332
- BIT_SET (Mtm -> disabledNodeMask , nodeId - 1 );
1333
- Mtm -> nLiveNodes -= 1 ;
1373
+ MtmDisableNode (nodeId );
1334
1374
MtmCheckQuorum ();
1335
1375
/* Interrupt voting for active transaction and abort them */
1336
1376
for (ts = Mtm -> transListHead ; ts != NULL ; ts = ts -> next ) {
@@ -1504,6 +1544,7 @@ static void MtmInitialize()
1504
1544
Mtm -> nodes [i ].lastStatusChangeTime = MtmGetSystemTime ();
1505
1545
Mtm -> nodes [i ].con = MtmConnections [i ];
1506
1546
Mtm -> nodes [i ].flushPos = 0 ;
1547
+ Mtm -> nodes [i ].lastHeartbeat = 0 ;
1507
1548
}
1508
1549
PGSemaphoreCreate (& Mtm -> votingSemaphore );
1509
1550
PGSemaphoreReset (& Mtm -> votingSemaphore );
@@ -1628,6 +1669,36 @@ _PG_init(void)
1628
1669
if (!process_shared_preload_libraries_in_progress )
1629
1670
return ;
1630
1671
1672
+ DefineCustomIntVariable (
1673
+ "multimaster.heartbeat_send_timeout" ,
1674
+ "Timeout in milliseconds of sending heartbeat messages" ,
1675
+ "Period of broadcasting heartbeat messages by abiter to all nodes" ,
1676
+ & MtmHeartbeatSendTimeout ,
1677
+ 1000 ,
1678
+ 1 ,
1679
+ INT_MAX ,
1680
+ PGC_BACKEND ,
1681
+ 0 ,
1682
+ NULL ,
1683
+ NULL ,
1684
+ NULL
1685
+ );
1686
+
1687
+ DefineCustomIntVariable (
1688
+ "multimaster.heartbeat_recv_timeout" ,
1689
+ "Timeout in milliseconds of receiving heartbeat messages" ,
1690
+ "If no heartbeat message is received from node within this period, it assumed to be dead" ,
1691
+ & MtmHeartbeatRecvTimeout ,
1692
+ 2000 ,
1693
+ 1 ,
1694
+ INT_MAX ,
1695
+ PGC_BACKEND ,
1696
+ 0 ,
1697
+ NULL ,
1698
+ NULL ,
1699
+ NULL
1700
+ );
1701
+
1631
1702
DefineCustomIntVariable (
1632
1703
"multimaster.gc_period" ,
1633
1704
"Number of distributed transactions after which garbage collection is started" ,
@@ -2057,9 +2128,7 @@ void MtmDropNode(int nodeId, bool dropSlot)
2057
2128
{
2058
2129
elog (ERROR , "NodeID %d is out of range [1,%d]" , nodeId , Mtm -> nLiveNodes );
2059
2130
}
2060
- Mtm -> nodes [nodeId - 1 ].lastStatusChangeTime = MtmGetSystemTime ();
2061
- BIT_SET (Mtm -> disabledNodeMask , nodeId - 1 );
2062
- Mtm -> nLiveNodes -= 1 ;
2131
+ MtmDisableNode (nodeId );
2063
2132
MtmCheckQuorum ();
2064
2133
if (!MtmIsBroadcast ())
2065
2134
{
@@ -2111,17 +2180,13 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
2111
2180
if (MtmIsRecoverySession ) {
2112
2181
MTM_LOG1 ("%d: Node %d start recovery of node %d" , MyProcPid , MtmNodeId , MtmReplicationNodeId );
2113
2182
if (!BIT_CHECK (Mtm -> disabledNodeMask , MtmReplicationNodeId - 1 )) {
2114
- Mtm -> nodes [MtmReplicationNodeId - 1 ].lastStatusChangeTime = MtmGetSystemTime ();
2115
- BIT_SET (Mtm -> disabledNodeMask , MtmReplicationNodeId - 1 );
2116
- Mtm -> nLiveNodes -= 1 ;
2183
+ MtmDisableNode (MtmReplicationNodeId );
2117
2184
MtmCheckQuorum ();
2118
2185
}
2119
2186
} else if (BIT_CHECK (Mtm -> disabledNodeMask , MtmReplicationNodeId - 1 )) {
2120
2187
if (recoveryCompleted ) {
2121
2188
MTM_LOG1 ("Node %d consider that recovery of node %d is completed: start normal replication" , MtmNodeId , MtmReplicationNodeId );
2122
- Mtm -> nodes [MtmReplicationNodeId - 1 ].lastStatusChangeTime = MtmGetSystemTime ();
2123
- BIT_CLEAR (Mtm -> disabledNodeMask , MtmReplicationNodeId - 1 );
2124
- Mtm -> nLiveNodes += 1 ;
2189
+ MtmEnableNode (MtmReplicationNodeId );
2125
2190
MtmCheckQuorum ();
2126
2191
} else {
2127
2192
elog (ERROR , "Disabled node %d tries to reconnect without recovery" , MtmReplicationNodeId );
0 commit comments