@@ -1049,6 +1049,7 @@ MtmCheckClusterLock()
1049
1049
Mtm -> nNodes += Mtm -> nLockers ;
1050
1050
Mtm -> nLockers = 0 ;
1051
1051
Mtm -> nodeLockerMask = 0 ;
1052
+ MtmCheckQuorum ();
1052
1053
}
1053
1054
}
1054
1055
break ;
@@ -1058,14 +1059,17 @@ MtmCheckClusterLock()
1058
1059
/**
1059
1060
* Build internode connectivity mask. 1 - means that node is disconnected.
1060
1061
*/
1061
- static void
1062
+ static bool
1062
1063
MtmBuildConnectivityMatrix (nodemask_t * matrix , bool nowait )
1063
1064
{
1064
1065
int i , j , n = MtmNodes ;
1065
1066
for (i = 0 ; i < n ; i ++ ) {
1066
1067
if (i + 1 != MtmNodeId ) {
1067
1068
void * data = PaxosGet (psprintf ("node-mask-%d" , i + 1 ), NULL , NULL , nowait );
1068
- matrix [i ] = data ? * (nodemask_t * )data : 0 ;
1069
+ if (data == NULL ) {
1070
+ return false;
1071
+ }
1072
+ matrix [i ] = * (nodemask_t * )data ;
1069
1073
} else {
1070
1074
matrix [i ] = Mtm -> connectivityMask ;
1071
1075
}
@@ -1076,21 +1080,25 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
1076
1080
matrix [i ] |= ((matrix [j ] >> i ) & 1 ) << j ;
1077
1081
}
1078
1082
}
1083
+ return true;
1079
1084
}
1080
1085
1081
1086
1082
1087
/**
1083
1088
* Build connectivity graph, find clique in it and extend disabledNodeMask by nodes not included in clique.
1084
1089
* This function returns false if current node is excluded from cluster, true otherwise
1085
1090
*/
1086
- void MtmRefreshClusterStatus (bool nowait )
1091
+ bool MtmRefreshClusterStatus (bool nowait )
1087
1092
{
1088
1093
nodemask_t mask , clique ;
1089
1094
nodemask_t matrix [MAX_NODES ];
1090
1095
int clique_size ;
1091
1096
int i ;
1092
1097
1093
- MtmBuildConnectivityMatrix (matrix , nowait );
1098
+ if (!MtmBuildConnectivityMatrix (matrix , nowait )) {
1099
+ /* RAFT is not available */
1100
+ return false;
1101
+ }
1094
1102
1095
1103
clique = MtmFindMaxClique (matrix , MtmNodes , & clique_size );
1096
1104
if (clique_size >= MtmNodes /2 + 1 ) { /* have quorum */
@@ -1110,6 +1118,7 @@ void MtmRefreshClusterStatus(bool nowait)
1110
1118
BIT_CLEAR (Mtm -> disabledNodeMask , i );
1111
1119
}
1112
1120
}
1121
+ MtmCheckQuorum ();
1113
1122
MtmUnlock ();
1114
1123
if (BIT_CHECK (Mtm -> disabledNodeMask , MtmNodeId - 1 )) {
1115
1124
if (Mtm -> status == MTM_ONLINE ) {
@@ -1122,9 +1131,27 @@ void MtmRefreshClusterStatus(bool nowait)
1122
1131
}
1123
1132
} else {
1124
1133
elog (WARNING , "Clique %lx has no quorum" , clique );
1134
+ Mtm -> status = MTM_IN_MINORITY ;
1125
1135
}
1136
+ return true;
1126
1137
}
1127
1138
1139
+ void MtmCheckQuorum (void )
1140
+ {
1141
+ if (Mtm -> nNodes < MtmNodes /2 + 1 ) {
1142
+ if (Mtm -> status == MTM_ONLINE ) { /* out of quorum */
1143
+ elog (WARNING , "Node is in minority: disabled mask %lx" , Mtm -> disabledNodeMask );
1144
+ Mtm -> status = MTM_IN_MINORITY ;
1145
+ }
1146
+ } else {
1147
+ if (Mtm -> status == MTM_IN_MINORITY ) {
1148
+ elog (WARNING , "Node is in majority: dissbled mask %lx" , Mtm -> disabledNodeMask );
1149
+ Mtm -> status = MTM_ONLINE ;
1150
+ }
1151
+ }
1152
+ }
1153
+
1154
+
1128
1155
void MtmOnNodeDisconnect (int nodeId )
1129
1156
{
1130
1157
BIT_SET (Mtm -> connectivityMask , nodeId - 1 );
@@ -1133,7 +1160,15 @@ void MtmOnNodeDisconnect(int nodeId)
1133
1160
/* Wait more than socket KEEPALIVE timeout to let other nodes update their statuses */
1134
1161
MtmSleep (MtmKeepaliveTimeout );
1135
1162
1136
- MtmRefreshClusterStatus (false);
1163
+ if (!MtmRefreshClusterStatus (false)) {
1164
+ MtmLock (LW_EXCLUSIVE );
1165
+ if (!BIT_CHECK (Mtm -> disabledNodeMask , nodeId - 1 )) {
1166
+ BIT_SET (Mtm -> disabledNodeMask , nodeId - 1 );
1167
+ Mtm -> nNodes -= 1 ;
1168
+ MtmCheckQuorum ();
1169
+ }
1170
+ MtmUnlock ();
1171
+ }
1137
1172
}
1138
1173
1139
1174
void MtmOnNodeConnect (int nodeId )
@@ -1635,6 +1670,7 @@ void MtmDropNode(int nodeId, bool dropSlot)
1635
1670
}
1636
1671
BIT_SET (Mtm -> disabledNodeMask , nodeId - 1 );
1637
1672
Mtm -> nNodes -= 1 ;
1673
+ MtmCheckQuorum ();
1638
1674
if (!MtmIsBroadcast ())
1639
1675
{
1640
1676
MtmBroadcastUtilityStmt (psprintf ("select mtm.drop_node(%d,%s)" , nodeId , dropSlot ? "true" : "false" ), true);
@@ -1649,6 +1685,7 @@ void MtmDropNode(int nodeId, bool dropSlot)
1649
1685
static void
1650
1686
MtmReplicationShutdownHook (struct PGLogicalShutdownHookArgs * args )
1651
1687
{
1688
+ elog (WARNING , "Logical replication to node %d is stopped" , MtmReplicationNodeId );
1652
1689
MtmOnNodeDisconnect (MtmReplicationNodeId );
1653
1690
}
1654
1691
0 commit comments