Skip to content

Commit 4760ea1

Browse files
committed
Add status change time
1 parent 406ab79 commit 4760ea1

File tree

4 files changed

+40
-8
lines changed

4 files changed

+40
-8
lines changed

contrib/mmts/multimaster--1.0.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ AS 'MODULE_PATHNAME','mtm_get_snapshot'
2424
LANGUAGE C;
2525

2626

27-
CREATE TYPE mtm.node_state AS (id integer, disabled bool, disconnected bool, catchUp bool, slotLag bigint, avgTransDelay bigint, connStr text);
27+
CREATE TYPE mtm.node_state AS (id integer, disabled bool, disconnected bool, catchUp bool, slotLag bigint, avgTransDelay bigint, lastStatusChange timestamp, connStr text);
2828

2929
CREATE FUNCTION mtm.get_nodes_state() RETURNS SETOF mtm.node_state
3030
AS 'MODULE_PATHNAME','mtm_get_nodes_state'

contrib/mmts/multimaster.c

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ int MtmConnectAttempts;
179179
int MtmConnectTimeout;
180180
int MtmKeepaliveTimeout;
181181
int MtmReconnectAttempts;
182+
int MtmNodeDisableDelay;
182183
bool MtmUseRaftable;
183184
MtmConnectionInfo* MtmConnections;
184185

@@ -993,6 +994,7 @@ void MtmRecoveryCompleted(void)
993994
MtmLock(LW_EXCLUSIVE);
994995
Mtm->recoverySlot = 0;
995996
BIT_CLEAR(Mtm->disabledNodeMask, MtmNodeId-1);
997+
Mtm->nodes[MtmNodeId-1].lastStatusChangeTime = time(NULL);
996998
/* Mode will be changed to online once all locagical reciever are connected */
997999
MtmSwitchClusterMode(MTM_CONNECTED);
9981000
MtmUnlock();
@@ -1081,6 +1083,7 @@ bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
10811083
/* We are lucky: caugth-up without locking cluster! */
10821084
}
10831085
BIT_CLEAR(Mtm->disabledNodeMask, nodeId-1);
1086+
Mtm->nodes[nodeId-1].lastStatusChangeTime = time(NULL);
10841087
Mtm->nNodes += 1;
10851088
caughtUp = true;
10861089
} else if (!BIT_CHECK(Mtm->nodeLockerMask, nodeId-1)
@@ -1223,13 +1226,15 @@ bool MtmRefreshClusterStatus(bool nowait)
12231226
if (mask & 1) {
12241227
Mtm->nNodes -= 1;
12251228
BIT_SET(Mtm->disabledNodeMask, i);
1229+
Mtm->nodes[i].lastStatusChangeTime = time(NULL);
12261230
}
12271231
}
12281232
mask = clique & Mtm->disabledNodeMask; /* new enabled nodes mask */
12291233
for (i = 0; mask != 0; i++, mask >>= 1) {
12301234
if (mask & 1) {
12311235
Mtm->nNodes += 1;
12321236
BIT_CLEAR(Mtm->disabledNodeMask, i);
1237+
Mtm->nodes[i].lastStatusChangeTime = time(NULL);
12331238
}
12341239
}
12351240
MtmCheckQuorum();
@@ -1269,6 +1274,11 @@ void MtmOnNodeDisconnect(int nodeId)
12691274
{
12701275
MtmTransState *ts;
12711276

1277+
if (Mtm->nodes[nodeId-1].lastStatusChangeTime + MtmNodeDisableDelay > time(NULL)) {
1278+
/* Avoid false detection of node failure and prevent node status blinking */
1279+
return;
1280+
}
1281+
12721282
BIT_SET(Mtm->connectivityMask, nodeId-1);
12731283
BIT_SET(Mtm->reconnectMask, nodeId-1);
12741284
RaftableSet(psprintf("node-mask-%d", MtmNodeId), &Mtm->connectivityMask, sizeof Mtm->connectivityMask, false);
@@ -1279,6 +1289,7 @@ void MtmOnNodeDisconnect(int nodeId)
12791289
if (!MtmRefreshClusterStatus(false)) {
12801290
MtmLock(LW_EXCLUSIVE);
12811291
if (!BIT_CHECK(Mtm->disabledNodeMask, nodeId-1)) {
1292+
Mtm->nodes[nodeId-1].lastStatusChangeTime = time(NULL);
12821293
BIT_SET(Mtm->disabledNodeMask, nodeId-1);
12831294
Mtm->nNodes -= 1;
12841295
MtmCheckQuorum();
@@ -1446,6 +1457,7 @@ static void MtmInitialize()
14461457
for (i = 0; i < MtmNodes; i++) {
14471458
Mtm->nodes[i].oldestSnapshot = 0;
14481459
Mtm->nodes[i].transDelay = 0;
1460+
Mtm->nodes[i].lastStatusChangeTime = time(NULL);
14491461
Mtm->nodes[i].con = MtmConnections[i];
14501462
}
14511463
PGSemaphoreCreate(&Mtm->votingSemaphore);
@@ -1566,10 +1578,25 @@ _PG_init(void)
15661578
if (!process_shared_preload_libraries_in_progress)
15671579
return;
15681580

1581+
DefineCustomIntVariable(
1582+
"multimaster.node_disable_delay",
1583+
"Minamal amount of time (sec) between node status change",
1584+
"This delay is used to avoid false detection of node failure and to prevent blinking of node status node",
1585+
&MtmNodeDisableDelay,
1586+
1,
1587+
1,
1588+
INT_MAX,
1589+
PGC_BACKEND,
1590+
0,
1591+
NULL,
1592+
NULL,
1593+
NULL
1594+
);
1595+
15691596
DefineCustomIntVariable(
15701597
"multimaster.min_recovery_lag",
15711598
"Minamal lag of WAL-sender performing recovery after which cluster is locked until recovery is completed",
1572-
"When wal-sender almost catch-up WAL current position we need to stop 'Achilles tortile compeition' and "
1599+
"When wal-sender almost catch-up WAL current position we need to stop 'Achilles tortile competition' and "
15731600
"temporary stop commit of new transactions until node will be completely repared",
15741601
&MtmMinRecoveryLag,
15751602
100000,
@@ -1891,6 +1918,7 @@ void MtmDropNode(int nodeId, bool dropSlot)
18911918
{
18921919
elog(ERROR, "NodeID %d is out of range [1,%d]", nodeId, Mtm->nNodes);
18931920
}
1921+
Mtm->nodes[nodeId-1].lastStatusChangeTime = time(NULL);
18941922
BIT_SET(Mtm->disabledNodeMask, nodeId-1);
18951923
Mtm->nNodes -= 1;
18961924
MtmCheckQuorum();
@@ -1941,13 +1969,15 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
19411969
if (MtmIsRecoverySession) {
19421970
MTM_LOG1("%d: Node %d start recovery of node %d", MyProcPid, MtmNodeId, MtmReplicationNodeId);
19431971
if (!BIT_CHECK(Mtm->disabledNodeMask, MtmReplicationNodeId-1)) {
1972+
Mtm->nodes[MtmReplicationNodeId-1].lastStatusChangeTime = time(NULL);
19441973
BIT_SET(Mtm->disabledNodeMask, MtmReplicationNodeId-1);
19451974
Mtm->nNodes -= 1;
19461975
MtmCheckQuorum();
19471976
}
19481977
} else if (BIT_CHECK(Mtm->disabledNodeMask, MtmReplicationNodeId-1)) {
19491978
if (recoveryCompleted) {
19501979
MTM_LOG1("Node %d consider that recovery of node %d is completed: start normal replication", MtmNodeId, MtmReplicationNodeId);
1980+
Mtm->nodes[MtmReplicationNodeId-1].lastStatusChangeTime = time(NULL);
19511981
BIT_CLEAR(Mtm->disabledNodeMask, MtmReplicationNodeId-1);
19521982
Mtm->nNodes += 1;
19531983
MtmCheckQuorum();
@@ -2058,8 +2088,8 @@ typedef struct
20582088
int nodeId;
20592089
char* connStrPtr;
20602090
TupleDesc desc;
2061-
Datum values[7];
2062-
bool nulls[7];
2091+
Datum values[8];
2092+
bool nulls[8];
20632093
} MtmGetNodeStateCtx;
20642094

20652095
Datum
@@ -2096,11 +2126,12 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
20962126
usrfctx->values[4] = Int64GetDatum(lag);
20972127
usrfctx->nulls[4] = lag < 0;
20982128
usrfctx->values[5] = Int64GetDatum(Mtm->transCount ? Mtm->nodes[usrfctx->nodeId-1].transDelay/Mtm->transCount : 0);
2129+
usrfctx->values[6] = TimestampTzGetDatum(time_t_to_timestamptz(Mtm->nodes[usrfctx->nodeId-1].lastStatusChangeTime));
20992130
p = strchr(usrfctx->connStrPtr, ',');
21002131
if (p != NULL) {
21012132
*p++ = '\0';
21022133
}
2103-
usrfctx->values[6] = CStringGetTextDatum(usrfctx->connStrPtr);
2134+
usrfctx->values[7] = CStringGetTextDatum(usrfctx->connStrPtr);
21042135
usrfctx->connStrPtr = p;
21052136
usrfctx->nodeId += 1;
21062137

contrib/mmts/multimaster.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ typedef struct
119119
{
120120
MtmConnectionInfo con;
121121
time_t transDelay;
122+
time_t lastStatusChangeTime;
122123
csn_t oldestSnapshot; /* Oldest snapshot used by active transactions at this node */
123124
} MtmNodeInfo;
124125

@@ -170,7 +171,6 @@ typedef struct
170171
MtmTransState** transListTail; /* Tail of L1 list of all finished transactionds, used to append new elements.
171172
This list is expected to be in CSN ascending order, by strict order may be violated */
172173
uint64 transCount; /* Counter of transactions perfromed by this node */
173-
time_t nodeTransDelay[MAX_NODES]; /* Time of waiting transaction acknowledgment from node */
174174
BgwPool pool; /* Pool of background workers for applying logical replication patches */
175175
MtmNodeInfo nodes[1]; /* [MtmNodes]: per-node data */
176176
} MtmState;
@@ -190,6 +190,7 @@ extern int MtmConnectAttempts;
190190
extern int MtmConnectTimeout;
191191
extern int MtmReconnectAttempts;
192192
extern int MtmKeepaliveTimeout;
193+
extern int MtmNodeDisableDelay;
193194
extern HTAB* MtmXid2State;
194195

195196
extern MtmConnectionInfo* MtmConnections;

contrib/mmts/t/001_basic_recovery.pl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,8 @@ sub PostgresNode::inet_connstr {
115115

116116
diag("starting node 2");
117117
$nodes[2]->start;
118-
diag("sleeping 30");
119-
sleep(30); # XXX: here we can poll
118+
diag("sleeping 10");
119+
sleep(10); # XXX: here we can poll
120120
diag("inserting 3");
121121
$nodes[0]->psql('postgres', "insert into t values(3, 30);");
122122
diag("selecting");

0 commit comments

Comments
 (0)