@@ -130,6 +130,7 @@ static void MtmBeginTransaction(MtmCurrentTrans* x);
130
130
static void MtmPrePrepareTransaction (MtmCurrentTrans * x );
131
131
static void MtmPostPrepareTransaction (MtmCurrentTrans * x );
132
132
static void MtmAbortPreparedTransaction (MtmCurrentTrans * x );
133
+ static void MtmCommitPreparedTransaction (MtmCurrentTrans * x );
133
134
static void MtmEndTransaction (MtmCurrentTrans * x , bool commit );
134
135
static bool MtmTwoPhaseCommit (MtmCurrentTrans * x );
135
136
static TransactionId MtmGetOldestXmin (Relation rel , bool ignoreVacuum );
@@ -685,6 +686,9 @@ MtmXactCallback(XactEvent event, void *arg)
685
686
case XACT_EVENT_ABORT_PREPARED :
686
687
MtmAbortPreparedTransaction (& MtmTx );
687
688
break ;
689
+ case XACT_EVENT_COMMIT_PREPARED :
690
+ MtmCommitPreparedTransaction (& MtmTx );
691
+ break ;
688
692
case XACT_EVENT_COMMIT :
689
693
MtmEndTransaction (& MtmTx , true);
690
694
break ;
@@ -793,6 +797,7 @@ MtmCreateTransState(MtmCurrentTrans* x)
793
797
ts -> status = TRANSACTION_STATUS_IN_PROGRESS ;
794
798
ts -> snapshot = x -> snapshot ;
795
799
ts -> isLocal = true;
800
+ ts -> isTwoPhase = x -> isTwoPhase ;
796
801
if (!found ) {
797
802
ts -> isEnqueued = false;
798
803
ts -> isActive = false;
@@ -970,6 +975,9 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
970
975
x -> status = ts -> status ;
971
976
MTM_LOG3 ("%d: Result of vote: %d" , MyProcPid , ts -> status );
972
977
MtmUnlock ();
978
+ if (x -> isTwoPhase ) {
979
+ MtmResetTransaction ();
980
+ }
973
981
}
974
982
//if (x->gid[0]) MTM_LOG1("Prepared transaction %d (%s) csn=%ld at %ld: %d", x->xid, x->gid, ts->csn, MtmGetCurrentTime(), ts->status);
975
983
if (Mtm -> inject2PCError == 3 ) {
@@ -980,6 +988,74 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
980
988
MTM_TXTRACE (x , "PostPrepareTransaction Finish" );
981
989
}
982
990
991
+ static void
992
+ MtmCommitPreparedTransaction (MtmCurrentTrans * x )
993
+ {
994
+ MtmTransMap * tm ;
995
+ MtmTransState * ts ;
996
+
997
+ if (Mtm -> status == MTM_RECOVERY || x -> isReplicated || x -> isPrepared ) { /* Ignore auto-2PC originated by multimaster */
998
+ return ;
999
+ }
1000
+ MtmLock (LW_EXCLUSIVE );
1001
+ tm = (MtmTransMap * )hash_search (MtmGid2State , x -> gid , HASH_FIND , NULL );
1002
+ if (tm == NULL ) {
1003
+ elog (WARNING , "Global transaciton ID '%s' is not found" , x -> gid );
1004
+ } else {
1005
+ time_t transTimeout = MSEC_TO_USEC (Mtm2PCMinTimeout );
1006
+ int nConfigChanges = Mtm -> nConfigChanges ;
1007
+ timestamp_t start = MtmGetSystemTime ();
1008
+ int result = 0 ;
1009
+
1010
+ Assert (tm -> state != NULL );
1011
+ MTM_LOG1 ("Commit prepared transaction %d with gid='%s'" , x -> xid , x -> gid );
1012
+ ts = tm -> state ;
1013
+
1014
+ Assert (MtmIsCoordinator (ts ));
1015
+
1016
+ ts -> votingCompleted = false;
1017
+ ts -> nVotes = 1 ; /* I voted myself */
1018
+ ts -> procno = MyProc -> pgprocno ;
1019
+ MTM_TXTRACE (ts , "Coordinator sends MSG_PREPARE" );
1020
+ MtmSend2PCMessage (ts , MSG_PREPARE );
1021
+
1022
+ /* Wait votes from all nodes until: */
1023
+ while (!ts -> votingCompleted /* all nodes voted */
1024
+ && nConfigChanges == Mtm -> nConfigChanges /* configarion is changed */
1025
+ && Mtm -> status == MTM_ONLINE /* node is not online */
1026
+ && ts -> status != TRANSACTION_STATUS_ABORTED /* transaction is aborted */
1027
+ && start + transTimeout >= MtmGetSystemTime ()) /* timeout is expired */
1028
+ {
1029
+ MtmUnlock ();
1030
+ MTM_TXTRACE (x , "CommitPreparedTransaction WaitLatch Start" );
1031
+ result = WaitLatch (& MyProc -> procLatch , WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH , MtmHeartbeatRecvTimeout );
1032
+ MTM_TXTRACE (x , "CommitPreparedTransaction WaitLatch Finish" );
1033
+ /* Emergency bailout if postmaster has died */
1034
+ if (result & WL_POSTMASTER_DEATH ) {
1035
+ proc_exit (1 );
1036
+ }
1037
+ if (result & WL_LATCH_SET ) {
1038
+ MTM_LOG3 ("Latch signaled at %ld" , MtmGetSystemTime ());
1039
+ ResetLatch (& MyProc -> procLatch );
1040
+ }
1041
+ MtmLock (LW_EXCLUSIVE );
1042
+ }
1043
+ if (ts -> status != TRANSACTION_STATUS_ABORTED && (!ts -> votingCompleted || nConfigChanges != Mtm -> nConfigChanges )) {
1044
+ if (nConfigChanges != Mtm -> nConfigChanges ) {
1045
+ elog (WARNING , "Transaction %d (%s) is aborted because cluster configuration is changed during commit" , x -> xid , x -> gid );
1046
+ } else {
1047
+ elog (WARNING , "Transaction %d (%s) is aborted because of %d msec timeout expiration, prepare time %d msec" ,
1048
+ x -> xid , x -> gid , (int )USEC_TO_MSEC (transTimeout ), (int )USEC_TO_MSEC (ts -> csn - x -> snapshot ));
1049
+ }
1050
+ MtmAbortTransaction (ts );
1051
+ }
1052
+ x -> status = ts -> status ;
1053
+ x -> xid = ts -> xid ;
1054
+ x -> isPrepared = true;
1055
+ MTM_LOG3 ("%d: Result of vote: %d" , MyProcPid , ts -> status );
1056
+ }
1057
+ MtmUnlock ();
1058
+ }
983
1059
984
1060
static void
985
1061
MtmAbortPreparedTransaction (MtmCurrentTrans * x )
@@ -1009,9 +1085,9 @@ MtmAbortPreparedTransaction(MtmCurrentTrans* x)
1009
1085
static void
1010
1086
MtmEndTransaction (MtmCurrentTrans * x , bool commit )
1011
1087
{
1012
- MTM_LOG3 ("%d: End transaction %d, prepared=%d, replicated=%d, distributed=%d, gid=%s -> %s" ,
1013
- MyProcPid , x -> xid , x -> isPrepared , x -> isReplicated , x -> isDistributed , x -> gid , commit ? "commit" : "abort" );
1014
- if (x -> status != TRANSACTION_STATUS_ABORTED && x -> isDistributed && (x -> isPrepared || x -> isReplicated )) {
1088
+ MTM_LOG1 ("%d: End transaction %d, prepared=%d, replicated=%d, distributed=%d, 2pc =%d, gid=%s -> %s" ,
1089
+ MyProcPid , x -> xid , x -> isPrepared , x -> isReplicated , x -> isDistributed , x -> isTwoPhase , x -> gid , commit ? "commit" : "abort" );
1090
+ if (x -> status != TRANSACTION_STATUS_ABORTED && x -> isDistributed && (x -> isPrepared || x -> isReplicated ) && ! x -> isTwoPhase ) {
1015
1091
MtmTransState * ts = NULL ;
1016
1092
MtmLock (LW_EXCLUSIVE );
1017
1093
if (x -> isPrepared ) {
@@ -3820,9 +3896,13 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
3820
3896
}
3821
3897
break ;
3822
3898
case TRANS_STMT_PREPARE :
3899
+ MtmTx .isTwoPhase = true;
3900
+ strcpy (MtmTx .gid , stmt -> gid );
3901
+ break ;
3902
+ /* nobreak */
3823
3903
case TRANS_STMT_COMMIT_PREPARED :
3824
3904
case TRANS_STMT_ROLLBACK_PREPARED :
3825
- MtmTx .isTwoPhase = true ;
3905
+ Assert (! MtmTx .isTwoPhase ) ;
3826
3906
strcpy (MtmTx .gid , stmt -> gid );
3827
3907
break ;
3828
3908
default :
0 commit comments