Skip to content

Commit c2989af

Browse files
committed
recovery in progress
1 parent e6faa29 commit c2989af

File tree

5 files changed

+96
-40
lines changed

5 files changed

+96
-40
lines changed

contrib/mmts/multimaster.c

Lines changed: 77 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -696,6 +696,7 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
696696
ts->votingCompleted = false;
697697
ts->cmd = MSG_INVALID;
698698
ts->nSubxids = xactGetCommittedChildren(&subxids);
699+
Mtm->nActiveTransactions += 1;
699700

700701
x->isPrepared = true;
701702
x->csn = ts->csn;
@@ -795,6 +796,8 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
795796
ts->status = TRANSACTION_STATUS_ABORTED;
796797
}
797798
MtmAdjustSubtransactions(ts);
799+
Assert(Mtm->nActiveTransactions != 0);
800+
Mtm->nActiveTransactions -= 1;
798801
}
799802
if (!commit && x->isReplicated && TransactionIdIsValid(x->gtid.xid)) {
800803
/*
@@ -836,6 +839,13 @@ void MtmSendNotificationMessage(MtmTransState* ts, MtmMessageCode cmd)
836839
}
837840
}
838841

842+
void MtmRecoveryCompleted(void)
843+
{
844+
elog(WARNING, "Recevoery of node %d is completed", MtmNodeId);
845+
Mtm->recoverySlot = 0;
846+
MtmSwitchClusterMode(MTM_ONLINE);
847+
}
848+
839849
void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t globalSnapshot)
840850
{
841851
MtmLock(LW_EXCLUSIVE);
@@ -847,8 +857,7 @@ void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t globalSnapshot)
847857
Assert(Mtm->status == MTM_RECOVERY);
848858
} else if (Mtm->status == MTM_RECOVERY) {
849859
/* When recovery is completed we get normal transaction ID and switch to normal mode */
850-
Mtm->recoverySlot = 0;
851-
MtmSwitchClusterMode(MTM_ONLINE);
860+
MtmRecoveryCompleted();
852861
}
853862
MtmTx.gtid = *gtid;
854863
MtmTx.xid = GetCurrentTransactionId();
@@ -973,35 +982,52 @@ static int64 MtmGetSlotLag(int nodeId)
973982
*/
974983
bool MtmIsRecoveredNode(int nodeId)
975984
{
976-
return BIT_CHECK(Mtm->disabledNodeMask, nodeId-1));
985+
return BIT_CHECK(Mtm->disabledNodeMask, nodeId-1);
977986
}
978987

979988

980-
void MtmRecoveryPorgress(XLogRecPtr lsn)
989+
bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
981990
{
982-
983-
Assert(MyWalSnd != NULL); /* This function is called by WAL-sender, so it should not be NULL */
984-
if (!BIT_CHECK(Mtm->nodeLockerMask, nodeId-1)
985-
&& MyWalSnd->sentPtr + MtmMinRecoveryLag > GetXLogInsertRecPtr())
991+
bool caughtUp = false;
992+
if (MtmIsRecoveredNode(nodeId)) {
993+
XLogRecPtr walLSN = GetXLogInsertRecPtr();
994+
MtmLock(LW_EXCLUSIVE);
995+
if (slotLSN == walLSN) {
996+
if (BIT_CHECK(Mtm->nodeLockerMask, nodeId-1)) {
997+
elog(WARNING,"Node %d is caught-up", nodeId);
998+
BIT_CLEAR(Mtm->disabledNodeMask, nodeId-1);
999+
BIT_CLEAR(Mtm->walSenderLockerMask, MyWalSnd - WalSndCtl->walsnds);
1000+
BIT_CLEAR(Mtm->nodeLockerMask, nodeId-1);
1001+
Mtm->nLockers -= 1;
1002+
} else {
1003+
elog(WARNING,"Node %d is caugth-up without locking cluster", nodeId);
1004+
/* We are lucky: caugth-up without locking cluster! */
1005+
Mtm->nNodes += 1;
1006+
BIT_CLEAR(Mtm->disabledNodeMask, nodeId-1);
1007+
}
1008+
caughtUp = true;
1009+
} else if (!BIT_CHECK(Mtm->nodeLockerMask, nodeId-1)
1010+
&& slotLSN + MtmMinRecoveryLag > walLSN)
9861011
{
9871012
/*
9881013
* Wal sender almost catched up.
9891014
* Lock cluster preventing new transaction to start until wal is completely replayed.
9901015
* We have to maintain two bitmasks: one is marking wal sender, another - correspondent nodes.
9911016
* Is there some better way to establish mapping between nodes ad WAL-seconder?
9921017
*/
993-
elog(WARNING,"Node %d is catching up", nodeId);
994-
MtmLock(LW_EXCLUSIVE);
1018+
elog(WARNING,"Node %d is almost caught-up: lock cluster", nodeId);
1019+
Assert(MyWalSnd != NULL); /* This function is called by WAL-sender, so it should not be NULL */
9951020
BIT_SET(Mtm->nodeLockerMask, nodeId-1);
9961021
BIT_SET(Mtm->walSenderLockerMask, MyWalSnd - WalSndCtl->walsnds);
9971022
Mtm->nLockers += 1;
998-
MtmUnlock();
9991023
} else {
1000-
MTM_INFO("Continue recovery of node %d, slot position %lx, WAL position %lx, lockers %d\n", nodeId, MyWalSnd->sentPtr, GetXLogInsertRecPtr(), Mtm->nLockers);
1024+
MTM_INFO("Continue recovery of node %d, slot position %lx, WAL position %lx, WAL sender position %lx, lockers %d, active transactions %d\n", nodeId, slotLSN, walLSN, MyWalSnd->sentPtr, Mtm->nLockers, Mtm->nActiveTransactions);
10011025
}
1002-
return true;
1026+
MtmUnlock();
1027+
} else {
1028+
MTM_INFO("Node %d is not in recovery mode\n", nodeId);
10031029
}
1004-
return false;
1030+
return caughtUp;
10051031
}
10061032

10071033
void MtmSwitchClusterMode(MtmNodeStatus mode)
@@ -1020,22 +1046,24 @@ void MtmSwitchClusterMode(MtmNodeStatus mode)
10201046
static void
10211047
MtmCheckClusterLock()
10221048
{
1049+
timestamp_t delay = MIN_WAIT_TIMEOUT;
10231050
while (true)
10241051
{
10251052
nodemask_t mask = Mtm->walSenderLockerMask;
10261053
if (mask != 0) {
1027-
XLogRecPtr currLogPos = GetXLogInsertRecPtr();
1028-
int i;
1029-
timestamp_t delay = MIN_WAIT_TIMEOUT;
1030-
for (i = 0; mask != 0; i++, mask >>= 1) {
1031-
if (mask & 1) {
1032-
if (WalSndCtl->walsnds[i].sentPtr != currLogPos) {
1033-
/* recovery is in progress */
1034-
break;
1035-
} else {
1036-
/* recovered replica catched up with master */
1037-
elog(WARNING, "WAL-sender %d complete recovery", i);
1038-
BIT_CLEAR(Mtm->walSenderLockerMask, i);
1054+
if (Mtm->nActiveTransactions == 0) {
1055+
XLogRecPtr currLogPos = GetXLogInsertRecPtr();
1056+
int i;
1057+
for (i = 0; mask != 0; i++, mask >>= 1) {
1058+
if (mask & 1) {
1059+
if (WalSndCtl->walsnds[i].sentPtr != currLogPos) {
1060+
/* recovery is in progress */
1061+
break;
1062+
} else {
1063+
/* recovered replica catched up with master */
1064+
elog(WARNING, "WAL-sender %d complete recovery", i);
1065+
BIT_CLEAR(Mtm->walSenderLockerMask, i);
1066+
}
10391067
}
10401068
}
10411069
}
@@ -1265,6 +1293,7 @@ static void MtmInitialize()
12651293
Mtm->walSenderLockerMask = 0;
12661294
Mtm->nodeLockerMask = 0;
12671295
Mtm->nLockers = 0;
1296+
Mtm->nActiveTransactions = 0;
12681297
Mtm->votingTransactions = NULL;
12691298
Mtm->transListHead = NULL;
12701299
Mtm->transListTail = &Mtm->transListHead;
@@ -1705,12 +1734,31 @@ void MtmDropNode(int nodeId, bool dropSlot)
17051734
static void
17061735
MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
17071736
{
1737+
ListCell *param;
1738+
bool isRecoverySession = false;
1739+
foreach(param, args->in_params)
1740+
{
1741+
DefElem *elem = lfirst(param);
1742+
if (strcmp("mtm_replication_mode", elem->defname) == 0) {
1743+
isRecoverySession = elem->arg != NULL && strVal(elem->arg) != NULL && strcmp(strVal(elem->arg), "recovery") == 0;
1744+
break;
1745+
}
1746+
}
17081747
MtmLock(LW_EXCLUSIVE);
1709-
if (BIT_CHECK(Mtm->disabledNodeMask, MtmReplicationNodeId-1)) {
1710-
elog(WARNING, "Recovery of node %d is completed: start normal replication", MtmReplicationNodeId);
1748+
if (isRecoverySession) {
1749+
elog(WARNING, "Node %d start recovery of node %d", MtmNodeId, MtmReplicationNodeId);
1750+
if (!BIT_CHECK(Mtm->disabledNodeMask, MtmReplicationNodeId-1)) {
1751+
BIT_SET(Mtm->disabledNodeMask, MtmReplicationNodeId-1);
1752+
Mtm->nNodes -= 1;
1753+
MtmCheckQuorum();
1754+
}
1755+
} else if (BIT_CHECK(Mtm->disabledNodeMask, MtmReplicationNodeId-1)) {
1756+
elog(WARNING, "Node %d consider that recovery of node %d is completed: start normal replication", MtmNodeId, MtmReplicationNodeId);
17111757
BIT_CLEAR(Mtm->disabledNodeMask, MtmReplicationNodeId-1);
17121758
Mtm->nNodes += 1;
17131759
MtmCheckQuorum();
1760+
} else {
1761+
elog(NOTICE, "Node %d start logical replication to node %d in normal mode", MtmNodeId, MtmReplicationNodeId);
17141762
}
17151763
MtmUnlock();
17161764
}
@@ -1728,7 +1776,7 @@ MtmReplicationTxnFilterHook(struct PGLogicalTxnFilterArgs* args)
17281776
bool res = Mtm->status != MTM_RECOVERY
17291777
&& (args->origin_id == InvalidRepOriginId
17301778
|| MtmIsRecoveredNode(MtmReplicationNodeId));
1731-
MTM_TRACE("%d: MtmReplicationTxnFilterHook->%d\n", MyProcPid, res);
1779+
MTM_INFO("%d: MtmReplicationTxnFilterHook->%d\n", MyProcPid, res);
17321780
return res;
17331781
}
17341782

contrib/mmts/multimaster.h

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -130,18 +130,19 @@ typedef struct
130130
nodemask_t pglogicalNodeMask; /* bitmask of started pglogic receivers */
131131
nodemask_t walSenderLockerMask; /* Mask of WAL-senders IDs locking the cluster */
132132
nodemask_t nodeLockerMask; /* Mask of node IDs which WAL-senders are locking the cluster */
133-
int nNodes; /* number of active nodes */
134-
int nReceivers; /* number of initialized logical receivers (used to determine moment when Mtm intialization is completed */
135-
int nLockers; /* number of lockers */
136-
long timeShift; /* local time correction */
137-
csn_t csn; /* last obtained CSN: used to provide unique acending CSNs based on system time */
133+
int nNodes; /* Number of active nodes */
134+
int nReceivers; /* Number of initialized logical receivers (used to determine moment when Mtm intialization is completed */
135+
int nLockers; /* Number of lockers */
136+
int nActiveTransactions; /* Nunmber of active 2PC transactions */
137+
long timeShift; /* Local time correction */
138+
csn_t csn; /* Last obtained CSN: used to provide unique acending CSNs based on system time */
138139
MtmTransState* votingTransactions; /* L1-list of replicated transactions sendings notifications to coordinator.
139140
This list is used to pass information to mtm-sender BGW */
140141
MtmTransState* transListHead; /* L1 list of all finished transactions present in xid2state hash.
141142
It is cleanup by MtmGetOldestXmin */
142143
MtmTransState** transListTail; /* Tail of L1 list of all finished transactionds, used to append new elements.
143144
This list is expected to be in CSN ascending order, by strict order may be violated */
144-
uint64 transCount; /* Counter of transactions perfromed by this node */
145+
uint64 transCount; /* Counter of transactions perfromed by this node */
145146
time_t nodeTransDelay[MAX_NODES]; /* Time of waiting transaction acknowledgment from node */
146147
BgwPool pool; /* Pool of background workers for applying logical replication patches */
147148
MtmNodeInfo nodes[1]; /* [MtmNodes]: per-node data */
@@ -200,5 +201,6 @@ extern void MtmSwitchClusterMode(MtmNodeStatus mode);
200201
extern void MtmUpdateNodeConnectionInfo(MtmConnectionInfo* conn, char const* connStr);
201202
extern void MtmSetupReplicationHooks(struct PGLogicalHooks* hooks);
202203
extern void MtmCheckQuorum(void);
203-
204+
extern bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN);
205+
extern void MtmRecoveryCompleted(void);
204206
#endif

contrib/mmts/pglogical_apply.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -499,10 +499,12 @@ process_remote_commit(StringInfo in)
499499
uint8 flags;
500500
csn_t csn;
501501
const char *gid = NULL;
502+
bool caughtUp;
502503

503504
/* read flags */
504505
flags = pq_getmsgbyte(in);
505506
MtmReplicationNode = pq_getmsgbyte(in);
507+
caughtUp = pq_getmsgbyte(in) != 0;
506508

507509
/* read fields */
508510
replorigin_session_origin_lsn = pq_getmsgint64(in); /* commit_lsn */
@@ -571,6 +573,9 @@ process_remote_commit(StringInfo in)
571573
Assert(false);
572574
}
573575
MtmEndSession(true);
576+
if (caughtUp) {
577+
MtmRecoveryCompleted();
578+
}
574579
}
575580

576581
static void

contrib/mmts/pglogical_proto.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,11 +150,12 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
150150

151151
pq_sendbyte(out, 'C'); /* sending COMMIT */
152152

153-
MTM_TRACE("PGLOGICAL_SEND commit: event=%d, gid=%s\n", flags, txn->gid);
153+
MTM_INFO("PGLOGICAL_SEND commit: event=%d, gid=%s, commit_lsn=%lx, txn->end_lsn=%lx, xlog=%lx\n", flags, txn->gid, commit_lsn, txn->end_lsn, GetXLogInsertRecPtr());
154154

155155
/* send the flags field */
156156
pq_sendbyte(out, flags);
157157
pq_sendbyte(out, MtmNodeId);
158+
pq_sendbyte(out, MtmRecoveryCaughtUp(MtmReplicationNodeId, txn->end_lsn));
158159

159160
/* send fixed fields */
160161
pq_sendint64(out, commit_lsn);
@@ -167,7 +168,6 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
167168
if (flags != PGLOGICAL_COMMIT) {
168169
pq_sendstring(out, txn->gid);
169170
}
170-
171171
}
172172

173173
/*

contrib/mmts/pglogical_receiver.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -292,12 +292,13 @@ pglogical_receiver_main(Datum main_arg)
292292
}
293293
CommitTransactionCommand();
294294

295-
appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %u/%u (\"startup_params_format\" '1', \"max_proto_version\" '%d', \"min_proto_version\" '%d')",
295+
appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %u/%u (\"startup_params_format\" '1', \"max_proto_version\" '%d', \"min_proto_version\" '%d', \"mtm_replication_mode\" '%s')",
296296
args->receiver_slot,
297297
(uint32) (originStartPos >> 32),
298298
(uint32) originStartPos,
299299
MULTIMASTER_MAX_PROTO_VERSION,
300-
MULTIMASTER_MIN_PROTO_VERSION
300+
MULTIMASTER_MIN_PROTO_VERSION,
301+
mode == SLOT_OPEN_EXISTED ? "recovery" : "normal"
301302
);
302303
res = PQexec(conn, query->data);
303304
if (PQresultStatus(res) != PGRES_COPY_BOTH)

0 commit comments

Comments
 (0)