Skip to content

Commit a33c861

Browse files
committed
2PC bug fixes
1 parent 87eb469 commit a33c861

File tree

2 files changed

+20
-10
lines changed

2 files changed

+20
-10
lines changed

contrib/mmts/arbiter.c

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,7 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, TransactionId xid, int node, Mt
460460
buf->data[buf->used].code = ts->status == TRANSACTION_STATUS_ABORTED ? MSG_ABORTED : MSG_PREPARED;
461461
buf->data[buf->used].dxid = xid;
462462
buf->data[buf->used].sxid = ts->xid;
463-
buf->data[buf->used].csn = ts->csn;
463+
buf->data[buf->used].csn = ts->csn;
464464
buf->data[buf->used].node = MtmNodeId;
465465
buf->data[buf->used].disabledNodeMask = ds->disabledNodeMask;
466466
buf->used += 1;
@@ -495,6 +495,7 @@ static void MtmTransSender(Datum arg)
495495
MtmAppendBuffer(txBuffer, ts->gtid.xid, ts->gtid.node-1, ts);
496496
}
497497
ds->votingTransactions = NULL;
498+
498499
MtmUnlock();
499500

500501
for (i = 0; i < nNodes; i++) {
@@ -616,7 +617,8 @@ static void MtmTransReceiver(Datum arg)
616617
Assert (MtmIsCoordinator(ts));
617618
switch (msg->code) {
618619
case MSG_PREPARED:
619-
if (ts->status == TRANSACTION_STATUS_IN_PROGRESS) {
620+
if (ts->status != TRANSACTION_STATUS_ABORTED) {
621+
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS || ts->status == TRANSACTION_STATUS_UNKNOWN);
620622
if (msg->csn > ts->csn) {
621623
ts->csn = msg->csn;
622624
MtmSyncClock(ts->csn);
@@ -627,7 +629,8 @@ static void MtmTransReceiver(Datum arg)
627629
}
628630
break;
629631
case MSG_ABORTED:
630-
if (ts->status == TRANSACTION_STATUS_IN_PROGRESS) {
632+
if (ts->status != TRANSACTION_STATUS_ABORTED) {
633+
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS || ts->status == TRANSACTION_STATUS_UNKNOWN);
631634
ts->status = TRANSACTION_STATUS_ABORTED;
632635
MtmAdjustSubtransactions(ts);
633636
MtmWakeUpBackend(ts);

contrib/mmts/multimaster.c

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -571,7 +571,10 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
571571
MtmCheckClusterLock();
572572

573573
ts = hash_search(xid2state, &x->xid, HASH_ENTER, NULL);
574-
ts->status = TRANSACTION_STATUS_IN_PROGRESS;
574+
ts->status = TRANSACTION_STATUS_IN_PROGRESS;
575+
/*
576+
* Invalid CSN prevent replication of transaction by logical replication
577+
*/
575578
ts->snapshot = x->isReplicated || !x->containsDML ? INVALID_CSN : x->snapshot;
576579
ts->csn = MtmAssignCSN();
577580
ts->gtid = x->gtid;
@@ -603,7 +606,7 @@ MtmPrepareTransaction(MtmCurrentTrans* x)
603606
MtmTransState* ts;
604607

605608
MtmLock(LW_EXCLUSIVE);
606-
ts = hash_search(xid2state, &x->xid, HASH_ENTER, NULL);
609+
ts = hash_search(xid2state, &x->xid, HASH_FIND, NULL);
607610
Assert(ts != NULL);
608611
if (ts->status == TRANSACTION_STATUS_IN_PROGRESS) {
609612
ts->status = TRANSACTION_STATUS_UNKNOWN;
@@ -619,7 +622,7 @@ MtmPrepareTransaction(MtmCurrentTrans* x)
619622
} else {
620623
/* wait N commits or just one ABORT */
621624
ts->nVotes += 1; /* I vote myself */
622-
while (ts->nVotes != dtm->nNodes && ts->status == TRANSACTION_STATUS_IN_PROGRESS) {
625+
while (ts->nVotes != dtm->nNodes && ts->status != TRANSACTION_STATUS_ABORTED) {
623626
MtmUnlock();
624627
WaitLatch(&MyProc->procLatch, WL_LATCH_SET, -1);
625628
ResetLatch(&MyProc->procLatch);
@@ -628,6 +631,8 @@ MtmPrepareTransaction(MtmCurrentTrans* x)
628631
MtmUnlock();
629632
if (ts->status == TRANSACTION_STATUS_ABORTED) {
630633
elog(ERROR, "Distributed transaction %d is rejected by DTM", x->xid);
634+
} else {
635+
Assert(ts->status == TRANSACTION_STATUS_UNKNOWN);
631636
}
632637
}
633638
}
@@ -637,7 +642,7 @@ static void
637642
MtmEndTransaction(MtmCurrentTrans* x, bool commit)
638643
{
639644
MTM_TRACE("%d: End transaction %d, prepared=%d, distributed=%d -> %s\n", MyProcPid, x->xid, x->isPrepared, x->isDistributed, commit ? "commit" : "abort");
640-
if (x->isDistributed && (TransactionIdIsValid(x->xid) || x->isReplicated)) {
645+
if (x->isDistributed && (x->isPrepared || x->isReplicated)) {
641646
MtmTransState* ts;
642647
MtmLock(LW_EXCLUSIVE);
643648
if (x->isPrepared) {
@@ -656,7 +661,11 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
656661
}
657662
} else {
658663
ts->status = TRANSACTION_STATUS_ABORTED;
659-
if (x->isReplicated) {
664+
if (x->isReplicated && TransactionIdIsValid(x->gtid.xid)) {
665+
/*
666+
* Send notification only of ABORT happens during transaction processing at replicas,
667+
* do not send notification if ABORT is receiver from master
668+
*/
660669
MtmSendNotificationMessage(ts); /* send notification to coordinator */
661670
}
662671
}
@@ -683,8 +692,6 @@ void MtmSendNotificationMessage(MtmTransState* ts)
683692
}
684693
}
685694

686-
687-
688695
void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t globalSnapshot)
689696
{
690697
csn_t localSnapshot;

0 commit comments

Comments
 (0)