@@ -668,6 +668,29 @@ MtmBeginTransaction(MtmCurrentTrans* x)
668
668
}
669
669
}
670
670
671
+
672
+ static MtmTransState *
673
+ MtmCreateTransState (MtmCurrentTrans * x )
674
+ {
675
+ bool found ;
676
+ MtmTransState * ts = hash_search (MtmXid2State , & x -> xid , HASH_ENTER , & found );
677
+ if (!found ) {
678
+ ts -> status = TRANSACTION_STATUS_IN_PROGRESS ;
679
+ ts -> snapshot = x -> snapshot ;
680
+ if (TransactionIdIsValid (x -> gtid .xid )) {
681
+ Assert (x -> gtid .node != MtmNodeId );
682
+ ts -> gtid = x -> gtid ;
683
+ } else {
684
+ /* I am coordinator of transaction */
685
+ ts -> gtid .xid = x -> xid ;
686
+ ts -> gtid .node = MtmNodeId ;
687
+ }
688
+ }
689
+ return ts ;
690
+ }
691
+
692
+
693
+
671
694
/*
672
695
* Prepare transaction for two-phase commit.
673
696
* This code is executed by PRE_PREPARE hook before PREPARE message is sent to replicas by logical replication
@@ -676,7 +699,7 @@ static void
676
699
MtmPrePrepareTransaction (MtmCurrentTrans * x )
677
700
{
678
701
MtmTransState * ts ;
679
- TransactionId * subxids ;
702
+ TransactionId * subxids ;
680
703
681
704
if (!x -> isDistributed ) {
682
705
return ;
@@ -704,14 +727,12 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
704
727
MtmCheckClusterLock ();
705
728
}
706
729
707
- ts = hash_search (MtmXid2State , & x -> xid , HASH_ENTER , NULL );
708
- ts -> status = TRANSACTION_STATUS_IN_PROGRESS ;
730
+ ts = MtmCreateTransState (x );
709
731
/*
710
732
* Invalid CSN prevent replication of transaction by logical replication
711
733
*/
712
734
ts -> snapshot = x -> isReplicated || !x -> containsDML ? INVALID_CSN : x -> snapshot ;
713
735
ts -> csn = MtmAssignCSN ();
714
- ts -> gtid = x -> gtid ;
715
736
ts -> procno = MyProc -> pgprocno ;
716
737
ts -> nVotes = 1 ; /* I am voted myself */
717
738
ts -> votingCompleted = false;
@@ -723,15 +744,6 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
723
744
x -> csn = ts -> csn ;
724
745
725
746
Mtm -> transCount += 1 ;
726
-
727
- if (TransactionIdIsValid (x -> gtid .xid )) {
728
- Assert (x -> gtid .node != MtmNodeId );
729
- ts -> gtid = x -> gtid ;
730
- } else {
731
- /* I am coordinator of transaction */
732
- ts -> gtid .xid = x -> xid ;
733
- ts -> gtid .node = MtmNodeId ;
734
- }
735
747
MtmTransactionListAppend (ts );
736
748
MtmAddSubtransactions (ts , subxids , ts -> nSubxids );
737
749
MTM_TRACE ("%d: MtmPrePrepareTransaction prepare commit of %d (gtid.xid=%d, gtid.node=%d, CSN=%ld)\n" ,
@@ -845,7 +857,9 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
845
857
MtmTransactionListAppend (ts );
846
858
}
847
859
MtmSendNotificationMessage (ts , MSG_ABORTED ); /* send notification to coordinator */
848
- }
860
+ } else if (x -> status == TRANSACTION_STATUS_ABORTED && x -> isReplicated && !x -> isPrepared ) {
861
+ hash_search (MtmXid2State , & x -> xid , HASH_REMOVE , NULL );
862
+ }
849
863
MtmUnlock ();
850
864
}
851
865
MtmResetTransaction (x );
@@ -869,28 +883,32 @@ void MtmSendNotificationMessage(MtmTransState* ts, MtmMessageCode cmd)
869
883
870
884
void MtmJoinTransaction (GlobalTransactionId * gtid , csn_t globalSnapshot )
871
885
{
886
+ MtmTx .gtid = * gtid ;
887
+ MtmTx .xid = GetCurrentTransactionId ();
888
+ MtmTx .isReplicated = true;
889
+ MtmTx .isDistributed = true;
890
+ MtmTx .containsDML = true;
891
+
872
892
if (globalSnapshot != INVALID_CSN ) {
873
893
MtmLock (LW_EXCLUSIVE );
874
894
MtmSyncClock (globalSnapshot );
895
+ MtmTx .snapshot = globalSnapshot ;
896
+ if (Mtm -> status != MTM_RECOVERY ) {
897
+ MtmCreateTransState (& MtmTx ); /* we need local->remote xid mapping for deadlock detection */
898
+ }
875
899
MtmUnlock ();
876
900
} else {
877
901
globalSnapshot = MtmTx .snapshot ;
878
902
}
879
903
if (!TransactionIdIsValid (gtid -> xid )) {
880
904
/* In case of recovery InvalidTransactionId is passed */
881
905
if (Mtm -> status != MTM_RECOVERY ) {
882
- elog (PANIC , "Node %d tries to recover node %d which is in %s mode" , MtmReplicationNodeId , MtmNodeId , MtmNodeStatusMnem [Mtm -> status ]);
906
+ elog (PANIC , "Node %d tries to recover node %d which is in %s mode" , gtid -> node , MtmNodeId , MtmNodeStatusMnem [Mtm -> status ]);
883
907
}
884
908
} else if (Mtm -> status == MTM_RECOVERY ) {
885
909
/* When recovery is completed we get normal transaction ID and switch to normal mode */
886
910
MtmRecoveryCompleted ();
887
911
}
888
- MtmTx .gtid = * gtid ;
889
- MtmTx .xid = GetCurrentTransactionId ();
890
- MtmTx .snapshot = globalSnapshot ;
891
- MtmTx .isReplicated = true;
892
- MtmTx .isDistributed = true;
893
- MtmTx .containsDML = true;
894
912
}
895
913
896
914
void MtmSetCurrentTransactionGID (char const * gid )
0 commit comments