Skip to content

Commit bb988b1

Browse files
committed
some fixes in recovery
1 parent a380fe6 commit bb988b1

File tree

4 files changed

+49
-32
lines changed

4 files changed

+49
-32
lines changed

contrib/mmts/multimaster.c

Lines changed: 40 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -636,9 +636,11 @@ MtmBeginTransaction(MtmCurrentTrans* x)
636636
x->isDistributed = MtmIsUserTransaction();
637637
x->isPrepared = false;
638638
x->isTransactionBlock = IsTransactionBlock();
639-
/* Application name can be cahnged usnig PGAPPNAME environment variable */
639+
/* Application name can be changed usnig PGAPPNAME environment variable */
640640
if (!IsBackgroundWorker && x->isDistributed && Mtm->status != MTM_ONLINE && strcmp(application_name, MULTIMASTER_ADMIN) != 0) {
641-
/* reject all user's transactions at offline cluster */
641+
/* Reject all user's transactions at offline cluster.
642+
* Allow execution of transaction by bg-workers to make it possible to perform recovery.
643+
*/
642644
MtmUnlock();
643645
elog(ERROR, "Multimaster node is not online: current status %s", MtmNodeStatusMnem[Mtm->status]);
644646
}
@@ -674,14 +676,17 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
674676
if (Mtm->disabledNodeMask != 0) {
675677
MtmRefreshClusterStatus(true);
676678
if (!IsBackgroundWorker && Mtm->status != MTM_ONLINE) {
677-
elog(ERROR, "Abort current transaction because this cluster node is not online");
679+
/* Do not take in accoutn bg-workers which are performing recovery */
680+
elog(ERROR, "Abort current transaction because this cluster node is in %s status", MtmNodeStatusMnem[Mtm->status]);
678681
}
679682
}
680683

681684
MtmLock(LW_EXCLUSIVE);
682685

683686
/*
684-
* Check if there is global multimaster lock preventing new transaction from commit to make a chance to wal-senders to catch-up
687+
* Check if there is global multimaster lock preventing new transaction from commit to make a chance to wal-senders to catch-up.
688+
* Only "own" transactions are blacked. Transactions replicated from other nodes (including recovered transaction) should be proceeded
689+
* and should not cause cluster status change.
685690
*/
686691
if (!x->isReplicated) {
687692
MtmCheckClusterLock();
@@ -717,7 +722,8 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
717722
}
718723
MtmTransactionListAppend(ts);
719724
MtmAddSubtransactions(ts, subxids, ts->nSubxids);
720-
MTM_TRACE("%d: MtmPrePrepareTransaction prepare commit of %d CSN=%ld\n", MyProcPid, x->xid, ts->csn);
725+
MTM_TRACE("%d: MtmPrePrepareTransaction prepare commit of %d (gtid.xid=%d, gtid.node=%d, CSN=%ld)\n",
726+
MyProcPid, x->xid, ts->gtid.xid, ts->gtid.node, ts->csn);
721727
MtmUnlock();
722728

723729
}
@@ -843,14 +849,6 @@ void MtmSendNotificationMessage(MtmTransState* ts, MtmMessageCode cmd)
843849
}
844850
}
845851

846-
void MtmRecoveryCompleted(void)
847-
{
848-
elog(WARNING, "Recovery of node %d is completed", MtmNodeId);
849-
Mtm->recoverySlot = 0;
850-
BIT_CLEAR(Mtm->disabledNodeMask, MtmNodeId-1);
851-
MtmSwitchClusterMode(MTM_ONLINE);
852-
}
853-
854852
void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t globalSnapshot)
855853
{
856854
MtmLock(LW_EXCLUSIVE);
@@ -934,6 +932,18 @@ csn_t MtmGetTransactionCSN(TransactionId xid)
934932
* -------------------------------------------
935933
*/
936934

935+
void MtmRecoveryCompleted(void)
936+
{
937+
elog(WARNING, "Recovery of node %d is completed", MtmNodeId);
938+
MtmLock(LW_EXCLUSIVE);
939+
Mtm->recoverySlot = 0;
940+
BIT_CLEAR(Mtm->disabledNodeMask, MtmNodeId-1);
941+
/* Mode will be changed to online once all locagical reciever are connected */
942+
MtmSwitchClusterMode(MTM_CONNECTED);
943+
MtmUnlock();
944+
}
945+
946+
937947

938948
/**
939949
* Check state of replication slots. If some of them are too much lag behind wal, then drop this slots to avoid
@@ -994,10 +1004,10 @@ bool MtmIsRecoveredNode(int nodeId)
9941004
bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
9951005
{
9961006
bool caughtUp = false;
1007+
MtmLock(LW_EXCLUSIVE);
9971008
if (MtmIsRecoveredNode(nodeId)) {
9981009
XLogRecPtr walLSN = GetXLogInsertRecPtr();
999-
MtmLock(LW_EXCLUSIVE);
1000-
if (slotLSN == walLSN) {
1010+
if (slotLSN == walLSN && Mtm->nActiveTransactions == 0) {
10011011
if (BIT_CHECK(Mtm->nodeLockerMask, nodeId-1)) {
10021012
elog(WARNING,"Node %d is caught-up", nodeId);
10031013
BIT_CLEAR(Mtm->walSenderLockerMask, MyWalSnd - WalSndCtl->walsnds);
@@ -1019,18 +1029,17 @@ bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
10191029
* We have to maintain two bitmasks: one is marking wal sender, another - correspondent nodes.
10201030
* Is there some better way to establish mapping between nodes ad WAL-seconder?
10211031
*/
1022-
elog(WARNING,"Node %d is almost caught-up: lock cluster", nodeId);
1032+
elog(WARNING,"Node %d is almost caught-up: slot position %lx, WAL position %lx, active transactions %d",
1033+
nodeId, slotLSN, walLSN, Mtm->nActiveTransactions);
10231034
Assert(MyWalSnd != NULL); /* This function is called by WAL-sender, so it should not be NULL */
10241035
BIT_SET(Mtm->nodeLockerMask, nodeId-1);
10251036
BIT_SET(Mtm->walSenderLockerMask, MyWalSnd - WalSndCtl->walsnds);
10261037
Mtm->nLockers += 1;
10271038
} else {
10281039
MTM_INFO("Continue recovery of node %d, slot position %lx, WAL position %lx, WAL sender position %lx, lockers %d, active transactions %d\n", nodeId, slotLSN, walLSN, MyWalSnd->sentPtr, Mtm->nLockers, Mtm->nActiveTransactions);
10291040
}
1030-
MtmUnlock();
1031-
} else {
1032-
MTM_INFO("Node %d is not in recovery mode\n", nodeId);
10331041
}
1042+
MtmUnlock();
10341043
return caughtUp;
10351044
}
10361045

@@ -1045,7 +1054,7 @@ void MtmSwitchClusterMode(MtmNodeStatus mode)
10451054
/*
10461055
* If there are recovering nodes which are catching-up WAL, check the status and prevent new transaction from commit to give
10471056
* WAL-sender a chance to catch-up WAL, completely synchronize replica and switch it to normal mode.
1048-
* This function is called at transaction start with multimaster lock set
1057+
* This function is called before transaction prepare with multimaster lock set.
10491058
*/
10501059
static void
10511060
MtmCheckClusterLock()
@@ -1072,8 +1081,8 @@ MtmCheckClusterLock()
10721081
}
10731082
}
10741083
if (mask != 0) {
1075-
/* some "almost catch-up" wal-senders are still working */
1076-
/* Do not start new transactions until them complete */
1084+
/* some "almost catch-up" wal-senders are still working. */
1085+
/* Do not start new transactions until them are completed. */
10771086
MtmUnlock();
10781087
MtmSleep(delay);
10791088
if (delay*2 <= MAX_WAIT_TIMEOUT) {
@@ -1216,6 +1225,7 @@ void MtmOnNodeDisconnect(int nodeId)
12161225
void MtmOnNodeConnect(int nodeId)
12171226
{
12181227
BIT_CLEAR(Mtm->connectivityMask, nodeId-1);
1228+
elog(NOTICE, "Reconnect node %d", nodeId);
12191229
RaftableSet(psprintf("node-mask-%d", MtmNodeId), &Mtm->connectivityMask, sizeof Mtm->connectivityMask, false);
12201230
}
12211231

@@ -1646,19 +1656,23 @@ _PG_fini(void)
16461656
}
16471657

16481658

1649-
1659+
/*
1660+
* This functions is called by pglogical receiver main function when receiver background worker is started.
1661+
* We switch to ONLINE mode when all receviers are connected.
1662+
* As far as background worker can be restarted multiple times, use node bitmask.
1663+
*/
16501664
void MtmReceiverStarted(int nodeId)
16511665
{
1652-
SpinLockAcquire(&Mtm->spinlock);
1666+
MtmLock(LW_EXCLUSIVE);
16531667
if (!BIT_CHECK(Mtm->pglogicalNodeMask, nodeId-1)) {
16541668
BIT_SET(Mtm->pglogicalNodeMask, nodeId-1);
16551669
if (++Mtm->nReceivers == Mtm->nNodes-1) {
16561670
if (Mtm->status == MTM_CONNECTED) {
16571671
MtmSwitchClusterMode(MTM_ONLINE);
16581672
}
16591673
}
1660-
}
1661-
SpinLockRelease(&Mtm->spinlock);
1674+
}
1675+
MtmUnlock();
16621676
}
16631677

16641678
/*

contrib/mmts/multimaster.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ typedef uint64 csn_t; /* commit serial number */
4545

4646
#define PGLOGICAL_XACT_EVENT(flags) (flags & 0x03)
4747

48+
#define PGLOGICAL_CAUGHT_UP 0x04
49+
50+
4851
typedef uint64 timestamp_t;
4952

5053
/* Identifier of global transaction */

contrib/mmts/pglogical_apply.c

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -499,12 +499,10 @@ process_remote_commit(StringInfo in)
499499
uint8 flags;
500500
csn_t csn;
501501
const char *gid = NULL;
502-
bool caughtUp;
503502

504503
/* read flags */
505504
flags = pq_getmsgbyte(in);
506505
MtmReplicationNode = pq_getmsgbyte(in);
507-
caughtUp = pq_getmsgbyte(in) != 0;
508506

509507
/* read fields */
510508
replorigin_session_origin_lsn = pq_getmsgint64(in); /* commit_lsn */
@@ -573,7 +571,7 @@ process_remote_commit(StringInfo in)
573571
Assert(false);
574572
}
575573
MtmEndSession(true);
576-
if (caughtUp) {
574+
if (flags & PGLOGICAL_CAUGHT_UP) {
577575
MtmRecoveryCompleted();
578576
}
579577
}

contrib/mmts/pglogical_proto.c

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
103103
{
104104
bool isRecovery = MtmIsRecoveredNode(MtmReplicationNodeId);
105105
csn_t csn = MtmTransactionSnapshot(txn->xid);
106-
MTM_INFO("%d: pglogical_write_begin %d CSN=%ld\n", MyProcPid, txn->xid, csn);
106+
MTM_INFO("%d: pglogical_write_begin XID=%d node=%d CSN=%ld recovery=%d\n", MyProcPid, txn->xid, MtmReplicationNodeId, csn, isRecovery);
107107

108108
if (csn == INVALID_CSN && !isRecovery) {
109109
MtmIsFilteredTxn = true;
@@ -124,7 +124,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
124124
ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
125125
{
126126
uint8 flags = 0;
127-
127+
128128
if (txn->xact_action == XLOG_XACT_COMMIT)
129129
flags = PGLOGICAL_COMMIT;
130130
else if (txn->xact_action == XLOG_XACT_PREPARE)
@@ -146,6 +146,9 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
146146
if (csn == INVALID_CSN && !isRecovery) {
147147
return;
148148
}
149+
if (MtmRecoveryCaughtUp(MtmReplicationNodeId, txn->end_lsn)) {
150+
flags |= PGLOGICAL_CAUGHT_UP;
151+
}
149152
}
150153
pq_sendbyte(out, 'C'); /* sending COMMIT */
151154

@@ -154,7 +157,6 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
154157
/* send the flags field */
155158
pq_sendbyte(out, flags);
156159
pq_sendbyte(out, MtmNodeId);
157-
pq_sendbyte(out, MtmRecoveryCaughtUp(MtmReplicationNodeId, txn->end_lsn));
158160

159161
/* send fixed fields */
160162
pq_sendint64(out, commit_lsn);

0 commit comments

Comments
 (0)