Skip to content

Commit 4147df0

Browse files
committed
Recvoery bug fixing
1 parent 2956510 commit 4147df0

File tree

5 files changed

+26
-12
lines changed

5 files changed

+26
-12
lines changed

contrib/mmts/arbiter.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,7 @@ static void MtmOpenConnections()
386386
static bool MtmSendToNode(int node, void const* buf, int size)
387387
{
388388
while (sockets[node] < 0 || !MtmWriteSocket(sockets[node], buf, size)) {
389-
elog(WARNING, "Arbiter failed to write socket: %d", errno);
389+
elog(WARNING, "Arbiter failed to write to node %d: %d", node+1, errno);
390390
if (sockets[node] >= 0) {
391391
close(sockets[node]);
392392
}
@@ -395,6 +395,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
395395
MtmOnNodeDisconnect(node+1);
396396
return false;
397397
}
398+
elog(NOTICE, "Arbiter restablish connection with node %d", node+1);
398399
}
399400
return true;
400401
}

contrib/mmts/multimaster.c

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -673,7 +673,7 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
673673

674674
if (Mtm->disabledNodeMask != 0) {
675675
MtmRefreshClusterStatus(true);
676-
if (Mtm->status != MTM_ONLINE) {
676+
if (!IsBackgroundWorker && Mtm->status != MTM_ONLINE) {
677677
elog(ERROR, "Abort current transaction because this cluster node is not online");
678678
}
679679
}
@@ -683,7 +683,9 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
683683
/*
684684
* Check if there is global multimaster lock preventing new transaction from commit to make a chance to wal-senders to catch-up
685685
*/
686-
MtmCheckClusterLock();
686+
if (!x->isReplicated) {
687+
MtmCheckClusterLock();
688+
}
687689

688690
ts = hash_search(MtmXid2State, &x->xid, HASH_ENTER, NULL);
689691
ts->status = TRANSACTION_STATUS_IN_PROGRESS;
@@ -995,21 +997,23 @@ bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
995997
if (MtmIsRecoveredNode(nodeId)) {
996998
XLogRecPtr walLSN = GetXLogInsertRecPtr();
997999
MtmLock(LW_EXCLUSIVE);
1000+
#if 0
9981001
if (slotLSN == walLSN) {
9991002
if (BIT_CHECK(Mtm->nodeLockerMask, nodeId-1)) {
10001003
elog(WARNING,"Node %d is caught-up", nodeId);
1001-
BIT_CLEAR(Mtm->disabledNodeMask, nodeId-1);
10021004
BIT_CLEAR(Mtm->walSenderLockerMask, MyWalSnd - WalSndCtl->walsnds);
10031005
BIT_CLEAR(Mtm->nodeLockerMask, nodeId-1);
10041006
Mtm->nLockers -= 1;
10051007
} else {
10061008
elog(WARNING,"Node %d is caugth-up without locking cluster", nodeId);
10071009
/* We are lucky: caugth-up without locking cluster! */
1008-
Mtm->nNodes += 1;
1009-
BIT_CLEAR(Mtm->disabledNodeMask, nodeId-1);
10101010
}
1011+
BIT_CLEAR(Mtm->disabledNodeMask, nodeId-1);
1012+
Mtm->nNodes += 1;
10111013
caughtUp = true;
1012-
} else if (!BIT_CHECK(Mtm->nodeLockerMask, nodeId-1)
1014+
} else
1015+
#endif
1016+
if (!BIT_CHECK(Mtm->nodeLockerMask, nodeId-1)
10131017
&& slotLSN + MtmMinRecoveryLag > walLSN)
10141018
{
10151019
/*
@@ -2250,7 +2254,12 @@ MtmExecutorFinish(QueryDesc *queryDesc)
22502254

22512255
void MtmExecute(void* work, int size)
22522256
{
2253-
BgwPoolExecute(&Mtm->pool, work, size);
2257+
if (Mtm->status == MTM_RECOVERY) {
2258+
/* During recovery apply changes sequentially to preserve commit order */
2259+
MtmExecutor(0, work, size);
2260+
} else {
2261+
BgwPoolExecute(&Mtm->pool, work, size);
2262+
}
22542263
}
22552264

22562265
static BgwPool*

contrib/mmts/pglogical_apply.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -499,12 +499,12 @@ process_remote_commit(StringInfo in)
499499
uint8 flags;
500500
csn_t csn;
501501
const char *gid = NULL;
502-
bool caughtUp;
502+
bool caughtUp = false;
503503

504504
/* read flags */
505505
flags = pq_getmsgbyte(in);
506506
MtmReplicationNode = pq_getmsgbyte(in);
507-
caughtUp = pq_getmsgbyte(in) != 0;
507+
/*caughtUp = pq_getmsgbyte(in) != 0;*/
508508

509509
/* read fields */
510510
replorigin_session_origin_lsn = pq_getmsgint64(in); /* commit_lsn */

contrib/mmts/pglogical_proto.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
147147
return;
148148
}
149149
}
150+
MtmRecoveryCaughtUp(MtmReplicationNodeId, txn->end_lsn);
150151

151152
pq_sendbyte(out, 'C'); /* sending COMMIT */
152153

@@ -155,7 +156,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
155156
/* send the flags field */
156157
pq_sendbyte(out, flags);
157158
pq_sendbyte(out, MtmNodeId);
158-
pq_sendbyte(out, MtmRecoveryCaughtUp(MtmReplicationNodeId, txn->end_lsn));
159+
/*pq_sendbyte(out, MtmRecoveryCaughtUp(MtmReplicationNodeId, txn->end_lsn));*/
159160

160161
/* send fixed fields */
161162
pq_sendint64(out, commit_lsn);

src/backend/replication/logical/decode.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -488,7 +488,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
488488
{
489489
XLogRecPtr origin_lsn = InvalidXLogRecPtr;
490490
TimestampTz commit_time = parsed->xact_time;
491-
XLogRecPtr origin_id = XLogRecGetOrigin(buf->record);
491+
RepOriginId origin_id = XLogRecGetOrigin(buf->record);
492492
int i;
493493

494494
if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
@@ -541,6 +541,9 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
541541
(parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) ||
542542
FilterByOrigin(ctx, origin_id))
543543
{
544+
elog(WARNING, "%d: WAL-SENDER ignore record %lx with origin %d: SnapBuildXactNeedsSkip=%d, FilterByOrigin=%d",
545+
getpid(), buf->origptr, origin_id,
546+
SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr), FilterByOrigin(ctx, origin_id));
544547
for (i = 0; i < parsed->nsubxacts; i++)
545548
{
546549
ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);

0 commit comments

Comments
 (0)