Skip to content

Commit 50a052c

Browse files
knizhnikkelvich
authored andcommitted
Recvoery bug fixing
1 parent 7795f88 commit 50a052c

File tree

4 files changed

+22
-11
lines changed

4 files changed

+22
-11
lines changed

arbiter.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,7 @@ static void MtmOpenConnections()
386386
static bool MtmSendToNode(int node, void const* buf, int size)
387387
{
388388
while (sockets[node] < 0 || !MtmWriteSocket(sockets[node], buf, size)) {
389-
elog(WARNING, "Arbiter failed to write socket: %d", errno);
389+
elog(WARNING, "Arbiter failed to write to node %d: %d", node+1, errno);
390390
if (sockets[node] >= 0) {
391391
close(sockets[node]);
392392
}
@@ -395,6 +395,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
395395
MtmOnNodeDisconnect(node+1);
396396
return false;
397397
}
398+
elog(NOTICE, "Arbiter restablish connection with node %d", node+1);
398399
}
399400
return true;
400401
}

multimaster.c

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -672,7 +672,7 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
672672

673673
if (Mtm->disabledNodeMask != 0) {
674674
MtmRefreshClusterStatus(true);
675-
if (Mtm->status != MTM_ONLINE) {
675+
if (!IsBackgroundWorker && Mtm->status != MTM_ONLINE) {
676676
elog(ERROR, "Abort current transaction because this cluster node is not online");
677677
}
678678
}
@@ -682,7 +682,9 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
682682
/*
683683
* Check if there is global multimaster lock preventing new transaction from commit to make a chance to wal-senders to catch-up
684684
*/
685-
MtmCheckClusterLock();
685+
if (!x->isReplicated) {
686+
MtmCheckClusterLock();
687+
}
686688

687689
ts = hash_search(MtmXid2State, &x->xid, HASH_ENTER, NULL);
688690
ts->status = TRANSACTION_STATUS_IN_PROGRESS;
@@ -994,21 +996,23 @@ bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
994996
if (MtmIsRecoveredNode(nodeId)) {
995997
XLogRecPtr walLSN = GetXLogInsertRecPtr();
996998
MtmLock(LW_EXCLUSIVE);
999+
#if 0
9971000
if (slotLSN == walLSN) {
9981001
if (BIT_CHECK(Mtm->nodeLockerMask, nodeId-1)) {
9991002
elog(WARNING,"Node %d is caught-up", nodeId);
1000-
BIT_CLEAR(Mtm->disabledNodeMask, nodeId-1);
10011003
BIT_CLEAR(Mtm->walSenderLockerMask, MyWalSnd - WalSndCtl->walsnds);
10021004
BIT_CLEAR(Mtm->nodeLockerMask, nodeId-1);
10031005
Mtm->nLockers -= 1;
10041006
} else {
10051007
elog(WARNING,"Node %d is caugth-up without locking cluster", nodeId);
10061008
/* We are lucky: caugth-up without locking cluster! */
1007-
Mtm->nNodes += 1;
1008-
BIT_CLEAR(Mtm->disabledNodeMask, nodeId-1);
10091009
}
1010+
BIT_CLEAR(Mtm->disabledNodeMask, nodeId-1);
1011+
Mtm->nNodes += 1;
10101012
caughtUp = true;
1011-
} else if (!BIT_CHECK(Mtm->nodeLockerMask, nodeId-1)
1013+
} else
1014+
#endif
1015+
if (!BIT_CHECK(Mtm->nodeLockerMask, nodeId-1)
10121016
&& slotLSN + MtmMinRecoveryLag > walLSN)
10131017
{
10141018
/*
@@ -2248,7 +2252,12 @@ MtmExecutorFinish(QueryDesc *queryDesc)
22482252

22492253
void MtmExecute(void* work, int size)
22502254
{
2251-
BgwPoolExecute(&Mtm->pool, work, size);
2255+
if (Mtm->status == MTM_RECOVERY) {
2256+
/* During recovery apply changes sequentially to preserve commit order */
2257+
MtmExecutor(0, work, size);
2258+
} else {
2259+
BgwPoolExecute(&Mtm->pool, work, size);
2260+
}
22522261
}
22532262

22542263
static BgwPool*

pglogical_apply.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -497,12 +497,12 @@ process_remote_commit(StringInfo in)
497497
uint8 flags;
498498
csn_t csn;
499499
const char *gid = NULL;
500-
bool caughtUp;
500+
bool caughtUp = false;
501501

502502
/* read flags */
503503
flags = pq_getmsgbyte(in);
504504
MtmReplicationNode = pq_getmsgbyte(in);
505-
caughtUp = pq_getmsgbyte(in) != 0;
505+
/*caughtUp = pq_getmsgbyte(in) != 0;*/
506506

507507
/* read fields */
508508
replorigin_session_origin_lsn = pq_getmsgint64(in); /* commit_lsn */

pglogical_proto.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
147147
return;
148148
}
149149
}
150+
MtmRecoveryCaughtUp(MtmReplicationNodeId, txn->end_lsn);
150151

151152
pq_sendbyte(out, 'C'); /* sending COMMIT */
152153

@@ -155,7 +156,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
155156
/* send the flags field */
156157
pq_sendbyte(out, flags);
157158
pq_sendbyte(out, MtmNodeId);
158-
pq_sendbyte(out, MtmRecoveryCaughtUp(MtmReplicationNodeId, txn->end_lsn));
159+
/*pq_sendbyte(out, MtmRecoveryCaughtUp(MtmReplicationNodeId, txn->end_lsn));*/
159160

160161
/* send fixed fields */
161162
pq_sendint64(out, commit_lsn);

0 commit comments

Comments
 (0)