73
73
typedef struct {
74
74
TransactionId xid ; /* local transaction ID */
75
75
GlobalTransactionId gtid ; /* global transaction ID assigned by coordinator of transaction */
76
+ bool isTwoPhase ; /* user level 2PC */
76
77
bool isReplicated ; /* transaction on replica */
77
78
bool isDistributed ; /* transaction performed INSERT/UPDATE/DELETE and has to be replicated to other nodes */
78
79
bool isPrepared ; /* transaction is perpared at first stage of 2PC */
@@ -722,6 +723,7 @@ MtmResetTransaction()
722
723
x -> gtid .xid = InvalidTransactionId ;
723
724
x -> isDistributed = false;
724
725
x -> isPrepared = false;
726
+ x -> isTwoPhase = false;
725
727
x -> status = TRANSACTION_STATUS_UNKNOWN ;
726
728
}
727
729
@@ -749,6 +751,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
749
751
x -> isReplicated = MtmIsLogicalReceiver ;
750
752
x -> isDistributed = MtmIsUserTransaction ();
751
753
x -> isPrepared = false;
754
+ x -> isTwoPhase = false;
752
755
x -> isTransactionBlock = IsTransactionBlock ();
753
756
/* Application name can be changed usnig PGAPPNAME environment variable */
754
757
if (x -> isDistributed && Mtm -> status != MTM_ONLINE && strcmp (application_name , MULTIMASTER_ADMIN ) != 0 ) {
@@ -909,8 +912,7 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
909
912
Assert (ts != NULL );
910
913
//if (x->gid[0]) MTM_LOG1("Preparing transaction %d (%s) at %ld", x->xid, x->gid, MtmGetCurrentTime());
911
914
if (!MtmIsCoordinator (ts ) || Mtm -> status == MTM_RECOVERY ) {
912
- bool found ;
913
- MtmTransMap * tm = (MtmTransMap * )hash_search (MtmGid2State , x -> gid , HASH_ENTER , & found );
915
+ MtmTransMap * tm = (MtmTransMap * )hash_search (MtmGid2State , x -> gid , HASH_ENTER , NULL );
914
916
Assert (x -> gid [0 ]);
915
917
tm -> state = ts ;
916
918
ts -> votingCompleted = true;
@@ -928,8 +930,13 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
928
930
time_t transTimeout = Max (MSEC_TO_USEC (Mtm2PCMinTimeout ), (ts -> csn - ts -> snapshot )* Mtm2PCPrepareRatio /100 );
929
931
int result = 0 ;
930
932
int nConfigChanges = Mtm -> nConfigChanges ;
931
-
932
933
timestamp_t start = MtmGetSystemTime ();
934
+
935
+ if (x -> isTwoPhase ) {
936
+ MtmTransMap * tm = (MtmTransMap * )hash_search (MtmGid2State , x -> gid , HASH_ENTER , NULL );
937
+ tm -> state = ts ;
938
+ }
939
+
933
940
/* Wait votes from all nodes until: */
934
941
while (!ts -> votingCompleted /* all nodes voted */
935
942
&& nConfigChanges == Mtm -> nConfigChanges /* configarion is changed */
@@ -985,7 +992,7 @@ MtmAbortPreparedTransaction(MtmCurrentTrans* x)
985
992
MtmLock (LW_EXCLUSIVE );
986
993
tm = (MtmTransMap * )hash_search (MtmGid2State , x -> gid , HASH_REMOVE , NULL );
987
994
if (tm == NULL ) {
988
- elog (WARNING , "Global transaciton ID %s is not found" , x -> gid );
995
+ elog (WARNING , "Global transaciton ID '%s' is not found" , x -> gid );
989
996
} else {
990
997
Assert (tm -> state != NULL );
991
998
MTM_LOG1 ("Abort prepared transaction %d with gid='%s'" , x -> xid , x -> gid );
@@ -1268,7 +1275,7 @@ void MtmAbortTransaction(MtmTransState* ts)
1268
1275
Assert (MtmLockCount != 0 ); /* should be invoked with exclsuive lock */
1269
1276
if (ts -> status != TRANSACTION_STATUS_ABORTED ) {
1270
1277
if (ts -> status == TRANSACTION_STATUS_COMMITTED ) {
1271
- elog (WARNING , "Attempt to rollback already committed transaction %d (%s)" , ts -> xid , ts -> gid );
1278
+ elog (LOG , "Attempt to rollback already committed transaction %d (%s)" , ts -> xid , ts -> gid );
1272
1279
} else {
1273
1280
MTM_LOG1 ("Rollback active transaction %d:%d (local xid %d) status %d" , ts -> gtid .node , ts -> gtid .xid , ts -> xid , ts -> status );
1274
1281
ts -> status = TRANSACTION_STATUS_ABORTED ;
@@ -3804,11 +3811,10 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
3804
3811
}
3805
3812
break ;
3806
3813
case TRANS_STMT_PREPARE :
3807
- elog (ERROR , "Two phase commit is not supported by multimaster" );
3808
- break ;
3809
3814
case TRANS_STMT_COMMIT_PREPARED :
3810
3815
case TRANS_STMT_ROLLBACK_PREPARED :
3811
- skipCommand = true;
3816
+ MtmTx .isTwoPhase = true;
3817
+ strcpy (MtmTx .gid , stmt -> gid );
3812
3818
break ;
3813
3819
default :
3814
3820
break ;
@@ -3966,8 +3972,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
3966
3972
standard_ProcessUtility (parsetree , queryString , context ,
3967
3973
params , dest , completionTag );
3968
3974
}
3969
-
3970
- if (MtmTx .isDistributed && XactIsoLevel != XACT_REPEATABLE_READ && !MtmVolksWagenMode ) {
3975
+ if (!MtmVolksWagenMode && MtmTx .isDistributed && XactIsoLevel != XACT_REPEATABLE_READ ) {
3971
3976
elog (ERROR , "Isolation level %s is not supported by multimaster" , isoLevelStr [XactIsoLevel ]);
3972
3977
}
3973
3978
@@ -4175,7 +4180,7 @@ MtmDetectGlobalDeadLockFortXid(TransactionId xid)
4175
4180
}
4176
4181
MtmGetGtid (xid , & gtid );
4177
4182
hasDeadlock = MtmGraphFindLoop (& graph , & gtid );
4178
- elog (WARNING , "Distributed deadlock check by backend %d for %u:%u = %d" , MyProcPid , gtid .node , gtid .xid , hasDeadlock );
4183
+ elog (LOG , "Distributed deadlock check by backend %d for %u:%u = %d" , MyProcPid , gtid .node , gtid .xid , hasDeadlock );
4179
4184
if (!hasDeadlock ) {
4180
4185
/* There is no deadlock loop in graph, but deadlock can be caused by lack of apply workers: if all of them are busy, then some transactions
4181
4186
* can not be appied just because there are no vacant workers and it cause additional dependency between transactions which is not
0 commit comments