Skip to content

Commit c1f3c26

Browse files
committed
2 parents ec5fe25 + 8ea606c commit c1f3c26

File tree

2 files changed

+10
-7
lines changed

2 files changed

+10
-7
lines changed

contrib/mmts/multimaster.c

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ csn_t MtmTransactionSnapshot(TransactionId xid)
330330

331331
MtmLock(LW_SHARED);
332332
ts = hash_search(MtmXid2State, &xid, HASH_FIND, NULL);
333-
if (ts != NULL) {
333+
if (ts != NULL && !ts->isLocal) {
334334
snapshot = ts->snapshot;
335335
}
336336
MtmUnlock();
@@ -452,8 +452,8 @@ MtmAdjustOldestXid(TransactionId xid)
452452

453453
MtmLock(LW_EXCLUSIVE);
454454
ts = (MtmTransState*)hash_search(MtmXid2State, &xid, HASH_FIND, NULL);
455-
if (ts != NULL && ts->status == TRANSACTION_STATUS_COMMITTED) {
456-
csn_t oldestSnapshot = ts->csn;
455+
if (ts != NULL) {
456+
csn_t oldestSnapshot = ts->snapshot;
457457
Mtm->nodes[MtmNodeId-1].oldestSnapshot = oldestSnapshot;
458458
for (i = 0; i < Mtm->nAllNodes; i++) {
459459
if (!BIT_CHECK(Mtm->disabledNodeMask, i)
@@ -483,8 +483,7 @@ MtmAdjustOldestXid(TransactionId xid)
483483
if (prev != NULL) {
484484
Mtm->transListHead = prev;
485485
Mtm->oldestXid = xid = prev->xid;
486-
} else {
487-
Assert(TransactionIdPrecedesOrEquals(Mtm->oldestXid, xid));
486+
} else if (TransactionIdPrecedes(Mtm->oldestXid, xid)) {
488487
xid = Mtm->oldestXid;
489488
}
490489
} else {
@@ -650,6 +649,7 @@ MtmCreateTransState(MtmCurrentTrans* x)
650649
if (!found) {
651650
ts->status = TRANSACTION_STATUS_IN_PROGRESS;
652651
ts->snapshot = x->snapshot;
652+
ts->isLocal = true;
653653
if (TransactionIdIsValid(x->gtid.xid)) {
654654
Assert(x->gtid.node != MtmNodeId);
655655
ts->gtid = x->gtid;
@@ -704,7 +704,8 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
704704
/*
705705
* Invalid CSN prevent replication of transaction by logical replication
706706
*/
707-
ts->snapshot = x->isReplicated || !x->containsDML ? INVALID_CSN : x->snapshot;
707+
ts->isLocal = x->isReplicated || !x->containsDML;
708+
ts->snapshot = x->snapshot;
708709
ts->csn = MtmAssignCSN();
709710
ts->procno = MyProc->pgprocno;
710711
ts->nVotes = 1; /* I am voted myself */
@@ -834,7 +835,8 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
834835
Assert(TransactionIdIsValid(x->xid));
835836
ts = hash_search(MtmXid2State, &x->xid, HASH_ENTER, NULL);
836837
ts->status = TRANSACTION_STATUS_ABORTED;
837-
ts->snapshot = INVALID_CSN;
838+
ts->isLocal = true;
839+
ts->snapshot = x->snapshot;
838840
ts->csn = MtmAssignCSN();
839841
ts->gtid = x->gtid;
840842
ts->nSubxids = 0;

contrib/mmts/multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ typedef struct MtmTransState
145145
struct MtmTransState* nextVoting; /* Next element in L1-list of voting transactions. */
146146
struct MtmTransState* next; /* Next element in L1 list of all finished transaction present in xid2state hash */
147147
bool votingCompleted; /* 2PC voting is completed */
148+
bool isLocal; /* Transaction is either replicated, either doesn't contain DML statements, so it shoudl be ignored by pglogical replication */
148149
TransactionId xids[1]; /* [Mtm->nAllNodes]: transaction ID at replicas */
149150
} MtmTransState;
150151

0 commit comments

Comments
 (0)