Skip to content

Commit c61ddc3

Browse files
committed
Recovery fixes
1 parent 0201b1e commit c61ddc3

File tree

7 files changed

+89
-49
lines changed

7 files changed

+89
-49
lines changed

contrib/mmts/arbiter.c

Lines changed: 37 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -247,14 +247,13 @@ static bool MtmWriteSocket(int sd, void const* buf, int size)
247247
while (size != 0) {
248248
int rc = MtmWaitSocket(sd, true, MtmHeartbeatSendTimeout);
249249
if (rc == 1) {
250-
int n = send(sd, src, size, 0);
251-
if (n < 0) {
252-
Assert(errno != EINTR); /* should not happen in non-blocking call */
250+
while ((rc = send(sd, src, size, 0)) < 0 && errno == EINTR);
251+
if (rc < 0) {
253252
busy_socket = -1;
254253
return false;
255254
}
256-
size -= n;
257-
src += n;
255+
size -= rc;
256+
src += rc;
258257
} else if (rc < 0) {
259258
busy_socket = -1;
260259
return false;
@@ -266,15 +265,12 @@ static bool MtmWriteSocket(int sd, void const* buf, int size)
266265

267266
static int MtmReadSocket(int sd, void* buf, int buf_size)
268267
{
269-
int rc = recv(sd, buf, buf_size, 0);
268+
int rc;
269+
while ((rc = recv(sd, buf, buf_size, 0)) < 0 && errno == EINTR);
270270
if (rc < 0 && errno == EAGAIN) {
271271
rc = MtmWaitSocket(sd, false, MtmHeartbeatSendTimeout);
272272
if (rc == 1) {
273-
rc = recv(sd, buf, buf_size, 0);
274-
if (rc < 0) {
275-
Assert(errno != EINTR); /* should not happen in non-blocking call */
276-
return -1;
277-
}
273+
while ((rc = recv(sd, buf, buf_size, 0)) < 0 && errno == EINTR);
278274
} else {
279275
return 0;
280276
}
@@ -370,12 +366,13 @@ static void MtmSendHeartbeat()
370366
for (i = 0; i < Mtm->nAllNodes; i++)
371367
{
372368
if (i+1 != MtmNodeId && sockets[i] != busy_socket
373-
&& ((sockets[i] >= 0 && !BIT_CHECK(Mtm->disabledNodeMask, i)) || BIT_CHECK(Mtm->reconnectMask, i)))
369+
&& (Mtm->status != MTM_ONLINE
370+
|| (sockets[i] >= 0 && !BIT_CHECK(Mtm->disabledNodeMask, i) && !BIT_CHECK(Mtm->reconnectMask, i))))
374371
{
375372
if (!MtmSendToNode(i, &msg, sizeof(msg))) {
376373
elog(LOG, "Arbiter failed to send heartbeat to node %d", i+1);
377374
} else {
378-
MTM_LOG1("Send heartbeat to node %d with timestamp %ld", i+1, now);
375+
MTM_LOG2("Send heartbeat to node %d with timestamp %ld", i+1, now);
379376
}
380377
}
381378
}
@@ -593,8 +590,9 @@ static void MtmAcceptOneConnection()
593590
} else if (req.hdr.code != MSG_HANDSHAKE && req.hdr.dxid != HANDSHAKE_MAGIC) {
594591
elog(WARNING, "Arbiter get unexpected handshake message %d", req.hdr.code);
595592
close(fd);
596-
} else{
597-
Assert(req.hdr.node > 0 && req.hdr.node <= Mtm->nAllNodes && req.hdr.node != MtmNodeId);
593+
} else {
594+
int node = req.hdr.node-1;
595+
Assert(node >= 0 && node < Mtm->nAllNodes && node+1 != MtmNodeId);
598596

599597
MtmLock(LW_EXCLUSIVE);
600598
MtmCheckResponse(&req.hdr);
@@ -606,15 +604,18 @@ static void MtmAcceptOneConnection()
606604
resp.sxid = ShmemVariableCache->nextXid;
607605
resp.csn = MtmGetCurrentTime();
608606
resp.node = MtmNodeId;
609-
MtmUpdateNodeConnectionInfo(&Mtm->nodes[req.hdr.node-1].con, req.connStr);
607+
MtmUpdateNodeConnectionInfo(&Mtm->nodes[node].con, req.connStr);
610608
if (!MtmWriteSocket(fd, &resp, sizeof resp)) {
611-
elog(WARNING, "Arbiter failed to write response for handshake message to node %d", resp.node);
609+
elog(WARNING, "Arbiter failed to write response for handshake message to node %d", node+1);
612610
close(fd);
613611
} else {
614-
MTM_LOG1("Arbiter established connection with node %d", req.hdr.node);
615-
MtmRegisterSocket(fd, req.hdr.node-1);
616-
sockets[req.hdr.node-1] = fd;
617-
MtmOnNodeConnect(req.hdr.node);
612+
MTM_LOG1("Arbiter established connection with node %d", node+1);
613+
if (sockets[node] >= 0) {
614+
MtmUnregisterSocket(sockets[node]);
615+
}
616+
sockets[node] = fd;
617+
MtmRegisterSocket(fd, node);
618+
MtmOnNodeConnect(node+1);
618619
}
619620
}
620621
}
@@ -889,7 +890,7 @@ static void MtmTransReceiver(Datum arg)
889890
Mtm->nodes[msg->node-1].lastHeartbeat = MtmGetSystemTime();
890891

891892
if (msg->code == MSG_HEARTBEAT) {
892-
MTM_LOG1("Receive HEARTBEAT from node %d with timestamp %ld delay %ld",
893+
MTM_LOG2("Receive HEARTBEAT from node %d with timestamp %ld delay %ld",
893894
msg->node, msg->csn, USEC_TO_MSEC(MtmGetSystemTime() - msg->csn));
894895
continue;
895896
}
@@ -1002,21 +1003,23 @@ static void MtmTransReceiver(Datum arg)
10021003
}
10031004
}
10041005
}
1005-
now = MtmGetSystemTime();
1006-
if (now > lastHeartbeatCheck + MSEC_TO_USEC(MtmHeartbeatRecvTimeout)) {
1007-
if (!MtmWatchdog(stopPolling)) {
1008-
for (i = 0; i < nNodes; i++) {
1009-
if (Mtm->nodes[i].lastHeartbeat != 0 && sockets[i] >= 0) {
1010-
MTM_LOG1("Last hearbeat from node %d received %ld microseconds ago", i+1, now - Mtm->nodes[i].lastHeartbeat);
1006+
if (Mtm->status != MTM_RECOVERY) {
1007+
now = MtmGetSystemTime();
1008+
if (now > lastHeartbeatCheck + MSEC_TO_USEC(MtmHeartbeatRecvTimeout)) {
1009+
if (!MtmWatchdog(stopPolling)) {
1010+
for (i = 0; i < nNodes; i++) {
1011+
if (Mtm->nodes[i].lastHeartbeat != 0 && sockets[i] >= 0) {
1012+
MTM_LOG1("Last hearbeat from node %d received %ld microseconds ago", i+1, now - Mtm->nodes[i].lastHeartbeat);
1013+
}
10111014
}
1015+
MTM_LOG1("epoll started %ld and finished %ld microseconds ago", now - startPolling, now - stopPolling);
10121016
}
1013-
MTM_LOG1("epoll started %ld and finished %ld microseconds ago", now - startPolling, now - stopPolling);
1017+
lastHeartbeatCheck = now;
1018+
}
1019+
if (n == 0 && Mtm->disabledNodeMask != 0) {
1020+
/* If timeout is expired and there are disabled nodes, then recheck cluster's state */
1021+
MtmRefreshClusterStatus(false);
10141022
}
1015-
lastHeartbeatCheck = now;
1016-
}
1017-
if (n == 0 && Mtm->disabledNodeMask != 0) {
1018-
/* If timeout is expired and there are disabled nodes, then recheck cluster's state */
1019-
MtmRefreshClusterStatus(false);
10201023
}
10211024
}
10221025
proc_exit(1); /* force restart of this bgwroker */

contrib/mmts/multimaster.c

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -737,7 +737,6 @@ MtmCreateTransState(MtmCurrentTrans* x)
737737
/* I am coordinator of transaction */
738738
ts->gtid.xid = x->xid;
739739
ts->gtid.node = MtmNodeId;
740-
//ts->gid[0] = '\0';
741740
strcpy(ts->gid, x->gid);
742741
}
743742
return ts;
@@ -1181,26 +1180,33 @@ static void MtmDisableNode(int nodeId)
11811180
BIT_SET(Mtm->disabledNodeMask, nodeId-1);
11821181
Mtm->nodes[nodeId-1].lastStatusChangeTime = MtmGetSystemTime();
11831182
Mtm->nodes[nodeId-1].lastHeartbeat = 0; /* defuse watchdog until first heartbeat is received */
1184-
Mtm->nLiveNodes -= 1;
1185-
}
1183+
if (nodeId != MtmNodeId) {
1184+
Mtm->nLiveNodes -= 1;
1185+
}
1186+
elog(WARNING, "Disable node %d at xlog position %lx", nodeId, GetXLogInsertRecPtr());
1187+
}
11861188

11871189
static void MtmEnableNode(int nodeId)
11881190
{
11891191
BIT_CLEAR(Mtm->disabledNodeMask, nodeId-1);
11901192
BIT_CLEAR(Mtm->reconnectMask, nodeId-1);
11911193
Mtm->nodes[nodeId-1].lastStatusChangeTime = MtmGetSystemTime();
11921194
Mtm->nodes[nodeId-1].lastHeartbeat = 0; /* defuse watchdog until first heartbeat is received */
1193-
Mtm->nLiveNodes += 1;
1195+
if (nodeId != MtmNodeId) {
1196+
Mtm->nLiveNodes += 1;
1197+
}
1198+
elog(WARNING, "Enable node %d at xlog position %lx", nodeId, GetXLogInsertRecPtr());
11941199
}
11951200

11961201
void MtmRecoveryCompleted(void)
11971202
{
1198-
MTM_LOG1("Recovery of node %d is completed", MtmNodeId);
1203+
MTM_LOG1("Recovery of node %d is completed, disabled mask=%lx, reconnect mask=%ld, live nodes=%d",
1204+
MtmNodeId, Mtm->disabledNodeMask, Mtm->reconnectMask, Mtm->nLiveNodes);
11991205
MtmLock(LW_EXCLUSIVE);
12001206
Mtm->recoverySlot = 0;
1201-
BIT_CLEAR(Mtm->disabledNodeMask, MtmNodeId-1);
12021207
Mtm->nodes[MtmNodeId-1].lastStatusChangeTime = MtmGetSystemTime();
1203-
/* Mode will be changed to online once all locagical reciever are connected */
1208+
BIT_CLEAR(Mtm->disabledNodeMask, MtmNodeId-1);
1209+
/* Mode will be changed to online once all logical reciever are connected */
12041210
MtmSwitchClusterMode(MTM_CONNECTED);
12051211
MtmUnlock();
12061212
}
@@ -1459,16 +1465,16 @@ bool MtmRefreshClusterStatus(bool nowait)
14591465
MtmEnableNode(i+1);
14601466
}
14611467
}
1462-
#endif
14631468
Mtm->reconnectMask |= clique & Mtm->disabledNodeMask; /* new enabled nodes mask */
1469+
#endif
14641470

14651471
if (disabled) {
14661472
MtmCheckQuorum();
14671473
}
14681474
/* Interrupt voting for active transaction and abort them */
14691475
for (ts = Mtm->transListHead; ts != NULL; ts = ts->next) {
14701476
MTM_LOG3("Active transaction gid='%s', coordinator=%d, xid=%d, status=%d, gtid.xid=%d",
1471-
ts->gid, ts->gtid.node, ts->xid, ts->status, ts->gtid.xid);
1477+
ts->gid, ts->gtid.nхode, ts->xid, ts->status, ts->gtid.xid);
14721478
if (MtmIsCoordinator(ts)) {
14731479
if (!ts->votingCompleted && disabled != 0 && ts->status != TRANSACTION_STATUS_ABORTED) {
14741480
MtmAbortTransaction(ts);
@@ -1723,6 +1729,7 @@ static void MtmInitialize()
17231729
Mtm->transCount = 0;
17241730
Mtm->gcCount = 0;
17251731
Mtm->nConfigChanges = 0;
1732+
Mtm->recoveryCount = 0;
17261733
Mtm->localTablesHashLoaded = false;
17271734
Mtm->inject2PCError = 0;
17281735
for (i = 0; i < MtmNodes; i++) {
@@ -2266,6 +2273,9 @@ void MtmReceiverStarted(int nodeId)
22662273
MtmLock(LW_EXCLUSIVE);
22672274
if (!BIT_CHECK(Mtm->pglogicalNodeMask, nodeId-1)) {
22682275
BIT_SET(Mtm->pglogicalNodeMask, nodeId-1);
2276+
if (BIT_CHECK(Mtm->disabledNodeMask, nodeId-1)) {
2277+
MtmEnableNode(nodeId);
2278+
}
22692279
if (++Mtm->nReceivers == Mtm->nLiveNodes-1) {
22702280
if (Mtm->status == MTM_CONNECTED) {
22712281
MtmSwitchClusterMode(MTM_ONLINE);
@@ -2291,6 +2301,9 @@ MtmSlotMode MtmReceiverSlotMode(int nodeId)
22912301
/* Choose for recovery first available slot */
22922302
MTM_LOG1("Start recovery from node %d", nodeId);
22932303
Mtm->recoverySlot = nodeId;
2304+
Mtm->nReceivers = 0;
2305+
Mtm->recoveryCount += 1;
2306+
Mtm->pglogicalNodeMask = 0;
22942307
FinishAllPreparedTransactions(false);
22952308
return SLOT_OPEN_EXISTED;
22962309
}

contrib/mmts/multimaster.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ typedef struct
174174
LWLockPadded *locks; /* multimaster lock tranche */
175175
TransactionId oldestXid; /* XID of oldest transaction visible by any active transaction (local or global) */
176176
nodemask_t disabledNodeMask; /* bitmask of disabled nodes */
177-
nodemask_t connectivityMask; /* bitmask of dicconnected nodes */
177+
nodemask_t connectivityMask; /* bitmask of disconnected nodes */
178178
nodemask_t pglogicalNodeMask; /* bitmask of started pglogic receivers */
179179
nodemask_t walSenderLockerMask; /* Mask of WAL-senders IDs locking the cluster */
180180
nodemask_t nodeLockerMask; /* Mask of node IDs which WAL-senders are locking the cluster */
@@ -188,6 +188,7 @@ typedef struct
188188
int nLockers; /* Number of lockers */
189189
int nActiveTransactions; /* Nunmber of active 2PC transactions */
190190
int nConfigChanges; /* Number of cluster configuration changes */
191+
int recoveryCount; /* Number of completed recoveries */
191192
int64 timeShift; /* Local time correction */
192193
csn_t csn; /* Last obtained timestamp: used to provide unique acending CSNs based on system time */
193194
csn_t lastCsn; /* CSN of last committed transaction */

contrib/mmts/pglogical_apply.c

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -492,7 +492,7 @@ static void
492492
MtmEndSession(bool unlock)
493493
{
494494
if (replorigin_session_origin != InvalidRepOriginId) {
495-
MTM_LOG3("%d: Begin reset replorigin session: %d", MyProcPid, replorigin_session_origin);
495+
MTM_LOG2("%d: Begin reset replorigin session for node %d: %d, progress %lx", MyProcPid, MtmReplicationNodeId, replorigin_session_origin, replorigin_session_get_progress(false));
496496
replorigin_session_origin = InvalidRepOriginId;
497497
replorigin_session_reset();
498498
if (unlock) {
@@ -568,7 +568,7 @@ process_remote_commit(StringInfo in)
568568
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
569569
csn = pq_getmsgint64(in);
570570
gid = pq_getmsgstring(in);
571-
MTM_LOG2("PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s", csn, gid);
571+
MTM_LOG2("PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s, lsn=%ld", csn, gid, end_lsn);
572572
StartTransactionCommand();
573573
MtmBeginSession();
574574
MtmSetCurrentTransactionCSN(csn);
@@ -585,6 +585,7 @@ process_remote_commit(StringInfo in)
585585
if (MtmExchangeGlobalTransactionStatus(gid, TRANSACTION_STATUS_ABORTED) == TRANSACTION_STATUS_UNKNOWN) {
586586
MTM_LOG1("PGLOGICAL_ABORT_PREPARED commit: gid=%s #2", gid);
587587
StartTransactionCommand();
588+
MtmBeginSession();
588589
MtmSetCurrentTransactionGID(gid);
589590
FinishPreparedTransaction(gid, false);
590591
CommitTransactionCommand();
@@ -594,6 +595,12 @@ process_remote_commit(StringInfo in)
594595
default:
595596
Assert(false);
596597
}
598+
#if 0 /* Do ont need to advance slot position here: it will be done by transaction commit */
599+
if (replorigin_session_origin != InvalidRepOriginId) {
600+
replorigin_advance(replorigin_session_origin, end_lsn,
601+
XactLastCommitEnd, false, false);
602+
}
603+
#endif
597604
MtmEndSession(true);
598605
MtmUpdateLsnMapping(MtmReplicationNodeId, end_lsn);
599606
if (flags & PGLOGICAL_CAUGHT_UP) {
@@ -936,6 +943,11 @@ void MtmExecutor(int id, void* work, size_t size)
936943
while (true) {
937944
char action = pq_getmsgbyte(&s);
938945
MTM_LOG3("%d: REMOTE process action %c", MyProcPid, action);
946+
#if 0
947+
if (Mtm->status == MTM_RECOVERY) {
948+
MTM_LOG1("Replay action %c[%x]", action, s.data[s.cursor]);
949+
}
950+
#endif
939951
switch (action) {
940952
/* BEGIN */
941953
case 'B':

contrib/mmts/pglogical_proto.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,8 @@ decide_datum_transfer(Form_pg_attribute att, Form_pg_type typclass,
424424
PGLogicalProtoAPI *
425425
pglogical_init_api(PGLogicalProtoType typ)
426426
{
427-
PGLogicalProtoAPI* res = palloc0(sizeof(PGLogicalProtoAPI));
427+
PGLogicalProtoAPI* res = malloc(sizeof(PGLogicalProtoAPI));
428+
MemSet(res, 0, sizeof(PGLogicalProtoAPI));
428429
sscanf(MyReplicationSlot->data.name.data, MULTIMASTER_SLOT_PATTERN, &MtmReplicationNodeId);
429430
MTM_LOG1("%d: PRGLOGICAL init API for slot %s node %d", MyProcPid, MyReplicationSlot->data.name.data, MtmReplicationNodeId);
430431
res->write_rel = pglogical_write_rel;

contrib/mmts/pglogical_receiver.c

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -244,12 +244,15 @@ pglogical_receiver_main(Datum main_arg)
244244
*/
245245
while (!got_sigterm)
246246
{
247+
int count;
248+
247249
/*
248250
* Determine when and how we should open replication slot.
249251
* Druing recovery we need to open only one replication slot from which node should receive all transactions.
250252
* Slots at other nodes should be removed
251253
*/
252254
mode = MtmReceiverSlotMode(nodeId);
255+
count = Mtm->recoveryCount;
253256

254257
/* Establish connection to remote server */
255258
conn = PQconnectdb(connString);
@@ -303,7 +306,7 @@ pglogical_receiver_main(Datum main_arg)
303306
MTM_LOG1("Start logical receiver at position %lx from node %d", originStartPos, nodeId);
304307
} else {
305308
originStartPos = replorigin_get_progress(originId, false);
306-
MTM_LOG1("Restart logical receiver at position %lx from node %d", originStartPos, nodeId);
309+
MTM_LOG1("Restart logical receiver at position %lx with origin=%d from node %d", originStartPos, originId, nodeId);
307310
}
308311
CommitTransactionCommand();
309312

@@ -359,7 +362,12 @@ pglogical_receiver_main(Datum main_arg)
359362

360363
if (Mtm->status == MTM_OFFLINE || (Mtm->status == MTM_RECOVERY && Mtm->recoverySlot != nodeId))
361364
{
362-
ereport(LOG, (errmsg("%s: suspending WAL receiver because node was switched to %s mode", worker_proc, MtmNodeStatusMnem[Mtm->status])));
365+
ereport(LOG, (errmsg("%s: restart WAL receiver because node was switched to %s mode", worker_proc, MtmNodeStatusMnem[Mtm->status])));
366+
break;
367+
}
368+
if (count != Mtm->recoveryCount) {
369+
370+
ereport(LOG, (errmsg("%s: restart WAL receiver because node was recovered", worker_proc)));
363371
break;
364372
}
365373

src/backend/access/transam/twophase.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2279,10 +2279,12 @@ FinishAllPreparedTransactions(bool isCommit)
22792279

22802280
if (gxact->valid)
22812281
{
2282+
elog(LOG, "Finish prepared transaction %s", gxact->gid);
22822283
FinishPreparedTransaction(gxact->gid, isCommit);
22832284
count++;
22842285
}
22852286
}
2287+
elog(LOG, "Finish %d prepared transactions", count);
22862288

22872289
return count;
22882290
}

0 commit comments

Comments
 (0)