@@ -179,6 +179,7 @@ int MtmConnectAttempts;
179
179
int MtmConnectTimeout ;
180
180
int MtmKeepaliveTimeout ;
181
181
int MtmReconnectAttempts ;
182
+ int MtmNodeDisableDelay ;
182
183
bool MtmUseRaftable ;
183
184
MtmConnectionInfo * MtmConnections ;
184
185
@@ -993,6 +994,7 @@ void MtmRecoveryCompleted(void)
993
994
MtmLock (LW_EXCLUSIVE );
994
995
Mtm -> recoverySlot = 0 ;
995
996
BIT_CLEAR (Mtm -> disabledNodeMask , MtmNodeId - 1 );
997
+ Mtm -> nodes [MtmNodeId - 1 ].lastStatusChangeTime = time (NULL );
996
998
/* Mode will be changed to online once all locagical reciever are connected */
997
999
MtmSwitchClusterMode (MTM_CONNECTED );
998
1000
MtmUnlock ();
@@ -1081,6 +1083,7 @@ bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
1081
1083
/* We are lucky: caugth-up without locking cluster! */
1082
1084
}
1083
1085
BIT_CLEAR (Mtm -> disabledNodeMask , nodeId - 1 );
1086
+ Mtm -> nodes [nodeId - 1 ].lastStatusChangeTime = time (NULL );
1084
1087
Mtm -> nNodes += 1 ;
1085
1088
caughtUp = true;
1086
1089
} else if (!BIT_CHECK (Mtm -> nodeLockerMask , nodeId - 1 )
@@ -1223,13 +1226,15 @@ bool MtmRefreshClusterStatus(bool nowait)
1223
1226
if (mask & 1 ) {
1224
1227
Mtm -> nNodes -= 1 ;
1225
1228
BIT_SET (Mtm -> disabledNodeMask , i );
1229
+ Mtm -> nodes [i ].lastStatusChangeTime = time (NULL );
1226
1230
}
1227
1231
}
1228
1232
mask = clique & Mtm -> disabledNodeMask ; /* new enabled nodes mask */
1229
1233
for (i = 0 ; mask != 0 ; i ++ , mask >>= 1 ) {
1230
1234
if (mask & 1 ) {
1231
1235
Mtm -> nNodes += 1 ;
1232
1236
BIT_CLEAR (Mtm -> disabledNodeMask , i );
1237
+ Mtm -> nodes [i ].lastStatusChangeTime = time (NULL );
1233
1238
}
1234
1239
}
1235
1240
MtmCheckQuorum ();
@@ -1269,6 +1274,11 @@ void MtmOnNodeDisconnect(int nodeId)
1269
1274
{
1270
1275
MtmTransState * ts ;
1271
1276
1277
+ if (Mtm -> nodes [nodeId - 1 ].lastStatusChangeTime + MtmNodeDisableDelay > time (NULL )) {
1278
+ /* Avoid false detection of node failure and prevent node status blinking */
1279
+ return ;
1280
+ }
1281
+
1272
1282
BIT_SET (Mtm -> connectivityMask , nodeId - 1 );
1273
1283
BIT_SET (Mtm -> reconnectMask , nodeId - 1 );
1274
1284
RaftableSet (psprintf ("node-mask-%d" , MtmNodeId ), & Mtm -> connectivityMask , sizeof Mtm -> connectivityMask , false);
@@ -1279,6 +1289,7 @@ void MtmOnNodeDisconnect(int nodeId)
1279
1289
if (!MtmRefreshClusterStatus (false)) {
1280
1290
MtmLock (LW_EXCLUSIVE );
1281
1291
if (!BIT_CHECK (Mtm -> disabledNodeMask , nodeId - 1 )) {
1292
+ Mtm -> nodes [nodeId - 1 ].lastStatusChangeTime = time (NULL );
1282
1293
BIT_SET (Mtm -> disabledNodeMask , nodeId - 1 );
1283
1294
Mtm -> nNodes -= 1 ;
1284
1295
MtmCheckQuorum ();
@@ -1446,6 +1457,7 @@ static void MtmInitialize()
1446
1457
for (i = 0 ; i < MtmNodes ; i ++ ) {
1447
1458
Mtm -> nodes [i ].oldestSnapshot = 0 ;
1448
1459
Mtm -> nodes [i ].transDelay = 0 ;
1460
+ Mtm -> nodes [i ].lastStatusChangeTime = time (NULL );
1449
1461
Mtm -> nodes [i ].con = MtmConnections [i ];
1450
1462
}
1451
1463
PGSemaphoreCreate (& Mtm -> votingSemaphore );
@@ -1566,10 +1578,25 @@ _PG_init(void)
1566
1578
if (!process_shared_preload_libraries_in_progress )
1567
1579
return ;
1568
1580
1581
+ DefineCustomIntVariable (
1582
+ "multimaster.node_disable_delay" ,
1583
+ "Minamal amount of time (sec) between node status change" ,
1584
+ "This delay is used to avoid false detection of node failure and to prevent blinking of node status node" ,
1585
+ & MtmNodeDisableDelay ,
1586
+ 1 ,
1587
+ 1 ,
1588
+ INT_MAX ,
1589
+ PGC_BACKEND ,
1590
+ 0 ,
1591
+ NULL ,
1592
+ NULL ,
1593
+ NULL
1594
+ );
1595
+
1569
1596
DefineCustomIntVariable (
1570
1597
"multimaster.min_recovery_lag" ,
1571
1598
"Minamal lag of WAL-sender performing recovery after which cluster is locked until recovery is completed" ,
1572
- "When wal-sender almost catch-up WAL current position we need to stop 'Achilles tortile compeition ' and "
1599
+ "When wal-sender almost catch-up WAL current position we need to stop 'Achilles tortile competition ' and "
1573
1600
"temporary stop commit of new transactions until node will be completely repared" ,
1574
1601
& MtmMinRecoveryLag ,
1575
1602
100000 ,
@@ -1891,6 +1918,7 @@ void MtmDropNode(int nodeId, bool dropSlot)
1891
1918
{
1892
1919
elog (ERROR , "NodeID %d is out of range [1,%d]" , nodeId , Mtm -> nNodes );
1893
1920
}
1921
+ Mtm -> nodes [nodeId - 1 ].lastStatusChangeTime = time (NULL );
1894
1922
BIT_SET (Mtm -> disabledNodeMask , nodeId - 1 );
1895
1923
Mtm -> nNodes -= 1 ;
1896
1924
MtmCheckQuorum ();
@@ -1941,13 +1969,15 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
1941
1969
if (MtmIsRecoverySession ) {
1942
1970
MTM_LOG1 ("%d: Node %d start recovery of node %d" , MyProcPid , MtmNodeId , MtmReplicationNodeId );
1943
1971
if (!BIT_CHECK (Mtm -> disabledNodeMask , MtmReplicationNodeId - 1 )) {
1972
+ Mtm -> nodes [MtmReplicationNodeId - 1 ].lastStatusChangeTime = time (NULL );
1944
1973
BIT_SET (Mtm -> disabledNodeMask , MtmReplicationNodeId - 1 );
1945
1974
Mtm -> nNodes -= 1 ;
1946
1975
MtmCheckQuorum ();
1947
1976
}
1948
1977
} else if (BIT_CHECK (Mtm -> disabledNodeMask , MtmReplicationNodeId - 1 )) {
1949
1978
if (recoveryCompleted ) {
1950
1979
MTM_LOG1 ("Node %d consider that recovery of node %d is completed: start normal replication" , MtmNodeId , MtmReplicationNodeId );
1980
+ Mtm -> nodes [MtmReplicationNodeId - 1 ].lastStatusChangeTime = time (NULL );
1951
1981
BIT_CLEAR (Mtm -> disabledNodeMask , MtmReplicationNodeId - 1 );
1952
1982
Mtm -> nNodes += 1 ;
1953
1983
MtmCheckQuorum ();
@@ -2058,8 +2088,8 @@ typedef struct
2058
2088
int nodeId ;
2059
2089
char * connStrPtr ;
2060
2090
TupleDesc desc ;
2061
- Datum values [7 ];
2062
- bool nulls [7 ];
2091
+ Datum values [8 ];
2092
+ bool nulls [8 ];
2063
2093
} MtmGetNodeStateCtx ;
2064
2094
2065
2095
Datum
@@ -2096,11 +2126,12 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
2096
2126
usrfctx -> values [4 ] = Int64GetDatum (lag );
2097
2127
usrfctx -> nulls [4 ] = lag < 0 ;
2098
2128
usrfctx -> values [5 ] = Int64GetDatum (Mtm -> transCount ? Mtm -> nodes [usrfctx -> nodeId - 1 ].transDelay /Mtm -> transCount : 0 );
2129
+ usrfctx -> values [6 ] = TimestampTzGetDatum (time_t_to_timestamptz (Mtm -> nodes [usrfctx -> nodeId - 1 ].lastStatusChangeTime ));
2099
2130
p = strchr (usrfctx -> connStrPtr , ',' );
2100
2131
if (p != NULL ) {
2101
2132
* p ++ = '\0' ;
2102
2133
}
2103
- usrfctx -> values [6 ] = CStringGetTextDatum (usrfctx -> connStrPtr );
2134
+ usrfctx -> values [7 ] = CStringGetTextDatum (usrfctx -> connStrPtr );
2104
2135
usrfctx -> connStrPtr = p ;
2105
2136
usrfctx -> nodeId += 1 ;
2106
2137
0 commit comments