Skip to content

Commit 269d8dd

Browse files
committed
Node recovery
1 parent 5f43df1 commit 269d8dd

File tree

5 files changed

+33
-13
lines changed

5 files changed

+33
-13
lines changed

contrib/mmts/arbiter.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -348,10 +348,9 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
348348
/* Some node considered that I am dead, so switch to recovery mode */
349349
if (BIT_CHECK(resp.disabledNodeMask, MtmNodeId-1)) {
350350
elog(WARNING, "Node %d think that I am dead", resp.node);
351+
BIT_SET(Mtm->disabledNodeMask, MtmNodeId-1);
351352
MtmSwitchClusterMode(MTM_RECOVERY);
352353
}
353-
/* Combine disable masks from all node. Is it actually correct or we should better check availability of nodes ourselves? */
354-
Mtm->disabledNodeMask |= resp.disabledNodeMask;
355354
return sd;
356355
}
357356
}
@@ -377,7 +376,7 @@ static void MtmOpenConnections()
377376
}
378377
if (Mtm->nNodes < MtmNodes/2+1) { /* no quorum */
379378
elog(WARNING, "Node is out of quorum: only %d nodes from %d are accssible", Mtm->nNodes, MtmNodes);
380-
Mtm->status = MTM_OFFLINE;
379+
Mtm->status = MTM_IN_MINORITY;
381380
} else if (Mtm->status == MTM_INITIALIZATION) {
382381
MtmSwitchClusterMode(MTM_CONNECTED);
383382
}
@@ -431,6 +430,7 @@ static void MtmAcceptOneConnection()
431430
resp.dxid = HANDSHAKE_MAGIC;
432431
resp.sxid = ShmemVariableCache->nextXid;
433432
resp.csn = MtmGetCurrentTime();
433+
resp.node = MtmNodeId;
434434
MtmUpdateNodeConnectionInfo(&Mtm->nodes[req.hdr.node-1].con, req.connStr);
435435
if (!MtmWriteSocket(fd, &resp, sizeof resp)) {
436436
elog(WARNING, "Arbiter failed to write response for handshake message to node %d", resp.node);

contrib/mmts/multimaster.c

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,8 @@ char const* const MtmNodeStatusMnem[] =
156156
"Offline",
157157
"Connected",
158158
"Online",
159-
"Recovery"
159+
"Recovery",
160+
"InMinor"
160161
};
161162

162163
bool MtmDoReplication;
@@ -633,10 +634,10 @@ MtmBeginTransaction(MtmCurrentTrans* x)
633634
x->isDistributed = MtmIsUserTransaction();
634635
x->isPrepared = false;
635636
x->isTransactionBlock = IsTransactionBlock();
636-
if (x->isDistributed && Mtm->status != MTM_ONLINE) {
637+
/* Application name can be cahnged usnig PGAPPNAME environment variable */
638+
if (x->isDistributed && Mtm->status != MTM_ONLINE && strcmp(application_name, MULTIMASTER_ADMIN) != 0) {
637639
/* reject all user's transactions at offline cluster */
638640
MtmUnlock();
639-
Assert(Mtm->status == MTM_ONLINE);
640641
elog(ERROR, "Multimaster node is not online: current status %s", MtmNodeStatusMnem[Mtm->status]);
641642
}
642643
x->containsDML = false;
@@ -983,11 +984,14 @@ bool MtmIsRecoveredNode(int nodeId)
983984
* We have to maintain two bitmasks: one is marking wal sender, another - correspondent nodes.
984985
* Is there some better way to establish mapping between nodes ad WAL-seconder?
985986
*/
987+
elog(WARNING,"Node %d is catching up", nodeId);
986988
MtmLock(LW_EXCLUSIVE);
987989
BIT_SET(Mtm->nodeLockerMask, nodeId-1);
988990
BIT_SET(Mtm->walSenderLockerMask, MyWalSnd - WalSndCtl->walsnds);
989991
Mtm->nLockers += 1;
990992
MtmUnlock();
993+
} else {
994+
MTM_INFO("Continue recovery of node %d, slot position %lx, WAL position %lx, lockers %d\n", nodeId, MyWalSnd->sentPtr, GetXLogInsertRecPtr(), Mtm->nLockers);
991995
}
992996
return true;
993997
}
@@ -1024,7 +1028,7 @@ MtmCheckClusterLock()
10241028
break;
10251029
} else {
10261030
/* recovered replica catched up with master */
1027-
elog(WARNING, "WAL-sender %d complete receovery", i);
1031+
elog(WARNING, "WAL-sender %d complete recovery", i);
10281032
BIT_CLEAR(Mtm->walSenderLockerMask, i);
10291033
}
10301034
}
@@ -1610,8 +1614,9 @@ void MtmReceiverStarted(int nodeId)
16101614
if (!BIT_CHECK(Mtm->pglogicalNodeMask, nodeId-1)) {
16111615
BIT_SET(Mtm->pglogicalNodeMask, nodeId-1);
16121616
if (++Mtm->nReceivers == Mtm->nNodes-1) {
1613-
Assert(Mtm->status == MTM_CONNECTED);
1614-
MtmSwitchClusterMode(MTM_ONLINE);
1617+
if (Mtm->status == MTM_CONNECTED) {
1618+
MtmSwitchClusterMode(MTM_ONLINE);
1619+
}
16151620
}
16161621
}
16171622
SpinLockRelease(&Mtm->spinlock);
@@ -1624,19 +1629,28 @@ void MtmReceiverStarted(int nodeId)
16241629
*/
16251630
MtmSlotMode MtmReceiverSlotMode(int nodeId)
16261631
{
1632+
bool recovery = false;
16271633
while (Mtm->status != MTM_CONNECTED && Mtm->status != MTM_ONLINE) {
1634+
MTM_INFO("%d: receiver slot mode %s\n", MyProcPid, MtmNodeStatusMnem[Mtm->status]);
16281635
if (Mtm->status == MTM_RECOVERY) {
1636+
recovery = true;
16291637
if (Mtm->recoverySlot == 0 || Mtm->recoverySlot == nodeId) {
16301638
/* Choose for recovery first available slot */
1639+
elog(WARNING, "Start recovery from node %d", nodeId);
16311640
Mtm->recoverySlot = nodeId;
16321641
return SLOT_OPEN_EXISTED;
16331642
}
16341643
}
16351644
/* delay opening of other slots until recovery is completed */
16361645
MtmSleep(STATUS_POLL_DELAY);
16371646
}
1647+
if (recovery) {
1648+
elog(WARNING, "Recreate replication slot for node %d after end of recovery", nodeId);
1649+
} else {
1650+
MTM_INFO("%d: Reuse replication slot for node %d\n", MyProcPid, nodeId);
1651+
}
16381652
/* After recovery completion we need to drop all other slots to avoid receive of redundant data */
1639-
return Mtm->recoverySlot ? SLOT_CREATE_NEW : SLOT_OPEN_ALWAYS;
1653+
return recovery ? SLOT_CREATE_NEW : SLOT_OPEN_ALWAYS;
16401654
}
16411655

16421656
static bool MtmIsBroadcast()
@@ -1692,7 +1706,11 @@ MtmReplicationShutdownHook(struct PGLogicalShutdownHookArgs* args)
16921706
static bool
16931707
MtmReplicationTxnFilterHook(struct PGLogicalTxnFilterArgs* args)
16941708
{
1695-
return args->origin_id == InvalidRepOriginId || MtmIsRecoveredNode(MtmReplicationNodeId);
1709+
bool res = Mtm->status != MTM_RECOVERY
1710+
&& (args->origin_id == InvalidRepOriginId
1711+
|| MtmIsRecoveredNode(MtmReplicationNodeId));
1712+
MTM_TRACE("%d: MtmReplicationTxnFilterHook->%d\n", MyProcPid, res);
1713+
return res;
16961714
}
16971715

16981716
void MtmSetupReplicationHooks(struct PGLogicalHooks* hooks)

contrib/mmts/multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#define MULTIMASTER_MAX_CONN_STR_SIZE 128
2828
#define MULTIMASTER_MAX_HOST_NAME_SIZE 64
2929
#define MULTIMASTER_BROADCAST_SERVICE "mtm_broadcast"
30+
#define MULTIMASTER_ADMIN "mtm_admin"
3031

3132
#define USEC 1000000
3233

contrib/mmts/pglogical_proto.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
103103
{
104104
bool isRecovery = MtmIsRecoveredNode(MtmReplicationNodeId);
105105
csn_t csn = MtmTransactionSnapshot(txn->xid);
106-
MTM_TRACE("pglogical_write_begin %d CSN=%ld\n", txn->xid, csn);
106+
MTM_INFO("%d: pglogical_write_begin %d CSN=%ld\n", MyProcPid, txn->xid, csn);
107107

108108
if (csn == INVALID_CSN && !isRecovery) {
109109
MtmIsFilteredTxn = true;

src/backend/replication/slot.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,10 +248,11 @@ ReplicationSlotCreate(const char *name, bool db_specific,
248248
{
249249
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
250250

251-
if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
251+
if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0) {
252252
ereport(ERROR,
253253
(errcode(ERRCODE_DUPLICATE_OBJECT),
254254
errmsg("replication slot \"%s\" already exists", name)));
255+
}
255256
if (!s->in_use && slot == NULL)
256257
slot = s;
257258
}

0 commit comments

Comments
 (0)