@@ -571,7 +571,10 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
571
571
MtmCheckClusterLock ();
572
572
573
573
ts = hash_search (xid2state , & x -> xid , HASH_ENTER , NULL );
574
- ts -> status = TRANSACTION_STATUS_IN_PROGRESS ;
574
+ ts -> status = TRANSACTION_STATUS_IN_PROGRESS ;
575
+ /*
576
+ * Invalid CSN prevent replication of transaction by logical replication
577
+ */
575
578
ts -> snapshot = x -> isReplicated || !x -> containsDML ? INVALID_CSN : x -> snapshot ;
576
579
ts -> csn = MtmAssignCSN ();
577
580
ts -> gtid = x -> gtid ;
@@ -603,7 +606,7 @@ MtmPrepareTransaction(MtmCurrentTrans* x)
603
606
MtmTransState * ts ;
604
607
605
608
MtmLock (LW_EXCLUSIVE );
606
- ts = hash_search (xid2state , & x -> xid , HASH_ENTER , NULL );
609
+ ts = hash_search (xid2state , & x -> xid , HASH_FIND , NULL );
607
610
Assert (ts != NULL );
608
611
if (ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
609
612
ts -> status = TRANSACTION_STATUS_UNKNOWN ;
@@ -619,7 +622,7 @@ MtmPrepareTransaction(MtmCurrentTrans* x)
619
622
} else {
620
623
/* wait N commits or just one ABORT */
621
624
ts -> nVotes += 1 ; /* I vote myself */
622
- while (ts -> nVotes != dtm -> nNodes && ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
625
+ while (ts -> nVotes != dtm -> nNodes && ts -> status != TRANSACTION_STATUS_ABORTED ) {
623
626
MtmUnlock ();
624
627
WaitLatch (& MyProc -> procLatch , WL_LATCH_SET , -1 );
625
628
ResetLatch (& MyProc -> procLatch );
@@ -628,6 +631,8 @@ MtmPrepareTransaction(MtmCurrentTrans* x)
628
631
MtmUnlock ();
629
632
if (ts -> status == TRANSACTION_STATUS_ABORTED ) {
630
633
elog (ERROR , "Distributed transaction %d is rejected by DTM" , x -> xid );
634
+ } else {
635
+ Assert (ts -> status == TRANSACTION_STATUS_UNKNOWN );
631
636
}
632
637
}
633
638
}
@@ -637,7 +642,7 @@ static void
637
642
MtmEndTransaction (MtmCurrentTrans * x , bool commit )
638
643
{
639
644
MTM_TRACE ("%d: End transaction %d, prepared=%d, distributed=%d -> %s\n" , MyProcPid , x -> xid , x -> isPrepared , x -> isDistributed , commit ? "commit" : "abort" );
640
- if (x -> isDistributed && (TransactionIdIsValid ( x -> xid ) || x -> isReplicated )) {
645
+ if (x -> isDistributed && (x -> isPrepared || x -> isReplicated )) {
641
646
MtmTransState * ts ;
642
647
MtmLock (LW_EXCLUSIVE );
643
648
if (x -> isPrepared ) {
@@ -656,7 +661,11 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
656
661
}
657
662
} else {
658
663
ts -> status = TRANSACTION_STATUS_ABORTED ;
659
- if (x -> isReplicated ) {
664
+ if (x -> isReplicated && TransactionIdIsValid (x -> gtid .xid )) {
665
+ /*
666
+ * Send notification only of ABORT happens during transaction processing at replicas,
667
+ * do not send notification if ABORT is receiver from master
668
+ */
660
669
MtmSendNotificationMessage (ts ); /* send notification to coordinator */
661
670
}
662
671
}
@@ -683,8 +692,6 @@ void MtmSendNotificationMessage(MtmTransState* ts)
683
692
}
684
693
}
685
694
686
-
687
-
688
695
void MtmJoinTransaction (GlobalTransactionId * gtid , csn_t globalSnapshot )
689
696
{
690
697
csn_t localSnapshot ;
0 commit comments