Skip to content

Commit 3c1860a

Browse files
committed
Handle node disconnect
1 parent 73b401f commit 3c1860a

File tree

3 files changed

+52
-11
lines changed

3 files changed

+52
-11
lines changed

contrib/mmts/multimaster.c

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1049,6 +1049,7 @@ MtmCheckClusterLock()
10491049
Mtm->nNodes += Mtm->nLockers;
10501050
Mtm->nLockers = 0;
10511051
Mtm->nodeLockerMask = 0;
1052+
MtmCheckQuorum();
10521053
}
10531054
}
10541055
break;
@@ -1058,14 +1059,17 @@ MtmCheckClusterLock()
10581059
/**
10591060
* Build internode connectivity mask. 1 - means that node is disconnected.
10601061
*/
1061-
static void
1062+
static bool
10621063
MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
10631064
{
10641065
int i, j, n = MtmNodes;
10651066
for (i = 0; i < n; i++) {
10661067
if (i+1 != MtmNodeId) {
10671068
void* data = PaxosGet(psprintf("node-mask-%d", i+1), NULL, NULL, nowait);
1068-
matrix[i] = data ? *(nodemask_t*)data : 0;
1069+
if (data == NULL) {
1070+
return false;
1071+
}
1072+
matrix[i] = *(nodemask_t*)data;
10691073
} else {
10701074
matrix[i] = Mtm->connectivityMask;
10711075
}
@@ -1076,21 +1080,25 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
10761080
matrix[i] |= ((matrix[j] >> i) & 1) << j;
10771081
}
10781082
}
1083+
return true;
10791084
}
10801085

10811086

10821087
/**
10831088
* Build connectivity graph, find clique in it and extend disabledNodeMask by nodes not included in clique.
10841089
* This function returns false if current node is excluded from cluster, true otherwise
10851090
*/
1086-
void MtmRefreshClusterStatus(bool nowait)
1091+
bool MtmRefreshClusterStatus(bool nowait)
10871092
{
10881093
nodemask_t mask, clique;
10891094
nodemask_t matrix[MAX_NODES];
10901095
int clique_size;
10911096
int i;
10921097

1093-
MtmBuildConnectivityMatrix(matrix, nowait);
1098+
if (!MtmBuildConnectivityMatrix(matrix, nowait)) {
1099+
/* RAFT is not available */
1100+
return false;
1101+
}
10941102

10951103
clique = MtmFindMaxClique(matrix, MtmNodes, &clique_size);
10961104
if (clique_size >= MtmNodes/2+1) { /* have quorum */
@@ -1110,6 +1118,7 @@ void MtmRefreshClusterStatus(bool nowait)
11101118
BIT_CLEAR(Mtm->disabledNodeMask, i);
11111119
}
11121120
}
1121+
MtmCheckQuorum();
11131122
MtmUnlock();
11141123
if (BIT_CHECK(Mtm->disabledNodeMask, MtmNodeId-1)) {
11151124
if (Mtm->status == MTM_ONLINE) {
@@ -1122,9 +1131,27 @@ void MtmRefreshClusterStatus(bool nowait)
11221131
}
11231132
} else {
11241133
elog(WARNING, "Clique %lx has no quorum", clique);
1134+
Mtm->status = MTM_IN_MINORITY;
11251135
}
1136+
return true;
11261137
}
11271138

1139+
void MtmCheckQuorum(void)
1140+
{
1141+
if (Mtm->nNodes < MtmNodes/2+1) {
1142+
if (Mtm->status == MTM_ONLINE) { /* out of quorum */
1143+
elog(WARNING, "Node is in minority: disabled mask %lx", Mtm->disabledNodeMask);
1144+
Mtm->status = MTM_IN_MINORITY;
1145+
}
1146+
} else {
1147+
if (Mtm->status == MTM_IN_MINORITY) {
1148+
elog(WARNING, "Node is in majority: dissbled mask %lx", Mtm->disabledNodeMask);
1149+
Mtm->status = MTM_ONLINE;
1150+
}
1151+
}
1152+
}
1153+
1154+
11281155
void MtmOnNodeDisconnect(int nodeId)
11291156
{
11301157
BIT_SET(Mtm->connectivityMask, nodeId-1);
@@ -1133,7 +1160,15 @@ void MtmOnNodeDisconnect(int nodeId)
11331160
/* Wait more than socket KEEPALIVE timeout to let other nodes update their statuses */
11341161
MtmSleep(MtmKeepaliveTimeout);
11351162

1136-
MtmRefreshClusterStatus(false);
1163+
if (!MtmRefreshClusterStatus(false)) {
1164+
MtmLock(LW_EXCLUSIVE);
1165+
if (!BIT_CHECK(Mtm->disabledNodeMask, nodeId-1)) {
1166+
BIT_SET(Mtm->disabledNodeMask, nodeId-1);
1167+
Mtm->nNodes -= 1;
1168+
MtmCheckQuorum();
1169+
}
1170+
MtmUnlock();
1171+
}
11371172
}
11381173

11391174
void MtmOnNodeConnect(int nodeId)
@@ -1635,6 +1670,7 @@ void MtmDropNode(int nodeId, bool dropSlot)
16351670
}
16361671
BIT_SET(Mtm->disabledNodeMask, nodeId-1);
16371672
Mtm->nNodes -= 1;
1673+
MtmCheckQuorum();
16381674
if (!MtmIsBroadcast())
16391675
{
16401676
MtmBroadcastUtilityStmt(psprintf("select mtm.drop_node(%d,%s)", nodeId, dropSlot ? "true" : "false"), true);
@@ -1649,6 +1685,7 @@ void MtmDropNode(int nodeId, bool dropSlot)
16491685
static void
16501686
MtmReplicationShutdownHook(struct PGLogicalShutdownHookArgs* args)
16511687
{
1688+
elog(WARNING, "Logical replication to node %d is stopped", MtmReplicationNodeId);
16521689
MtmOnNodeDisconnect(MtmReplicationNodeId);
16531690
}
16541691

contrib/mmts/multimaster.h

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@
88
#include "pglogical_output/hooks.h"
99

1010
#define MTM_TUPLE_TRACE(fmt, ...)
11-
/*
11+
#if 0
1212
#define MTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
1313
#define MTM_TRACE(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
14-
*/
14+
#else
1515
#define MTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
1616
#define MTM_TRACE(fmt, ...)
17-
/* */
17+
#endif
1818

1919
#define MULTIMASTER_NAME "multimaster"
2020
#define MULTIMASTER_SCHEMA_NAME "mtm"
@@ -72,7 +72,8 @@ typedef enum
7272
MTM_OFFLINE, /* Node is out of quorum */
7373
MTM_CONNECTED, /* Arbiter is established connections with other nodes */
7474
MTM_ONLINE, /* Ready to receive client's queries */
75-
MTM_RECOVERY /* Node is in recovery process */
75+
MTM_RECOVERY, /* Node is in recovery process */
76+
MTM_IN_MINORITY /* Node is out of quorum */
7677
} MtmNodeStatus;
7778

7879
typedef enum
@@ -193,8 +194,10 @@ extern TransactionId MtmGetCurrentTransactionId(void);
193194
extern XidStatus MtmGetCurrentTransactionStatus(void);
194195
extern XidStatus MtmGetGlobalTransactionStatus(char const* gid);
195196
extern bool MtmIsRecoveredNode(int nodeId);
196-
extern void MtmRefreshClusterStatus(bool nowait);
197+
extern bool MtmRefreshClusterStatus(bool nowait);
197198
extern void MtmSwitchClusterMode(MtmNodeStatus mode);
198199
extern void MtmUpdateNodeConnectionInfo(MtmConnectionInfo* conn, char const* connStr);
199200
extern void MtmSetupReplicationHooks(struct PGLogicalHooks* hooks);
201+
extern void MtmCheckQuorum(void);
202+
200203
#endif

contrib/mmts/pglogical_receiver.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,8 +240,9 @@ pglogical_receiver_main(Datum main_arg)
240240
if (PQstatus(conn) != CONNECTION_OK)
241241
{
242242
PQfinish(conn);
243-
ereport(ERROR, (errmsg("%s: Could not establish connection to remote server",
243+
ereport(WARNING, (errmsg("%s: Could not establish connection to remote server",
244244
worker_proc)));
245+
MtmOnNodeDisconnect(args->remote_node);
245246
proc_exit(1);
246247
}
247248

0 commit comments

Comments
 (0)