72
72
typedef struct {
73
73
TransactionId xid ; /* local transaction ID */
74
74
GlobalTransactionId gtid ; /* global transaction ID assigned by coordinator of transaction */
75
+ bool isTwoPhase ; /* user level 2PC */
75
76
bool isReplicated ; /* transaction on replica */
76
77
bool isDistributed ; /* transaction performed INSERT/UPDATE/DELETE and has to be replicated to other nodes */
77
78
bool isPrepared ; /* transaction is perpared at first stage of 2PC */
@@ -719,6 +720,7 @@ MtmResetTransaction()
719
720
x -> gtid .xid = InvalidTransactionId ;
720
721
x -> isDistributed = false;
721
722
x -> isPrepared = false;
723
+ x -> isTwoPhase = false;
722
724
x -> status = TRANSACTION_STATUS_UNKNOWN ;
723
725
}
724
726
@@ -746,6 +748,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
746
748
x -> isReplicated = MtmIsLogicalReceiver ;
747
749
x -> isDistributed = MtmIsUserTransaction ();
748
750
x -> isPrepared = false;
751
+ x -> isTwoPhase = false;
749
752
x -> isTransactionBlock = IsTransactionBlock ();
750
753
/* Application name can be changed usnig PGAPPNAME environment variable */
751
754
if (x -> isDistributed && Mtm -> status != MTM_ONLINE && strcmp (application_name , MULTIMASTER_ADMIN ) != 0 ) {
@@ -906,8 +909,7 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
906
909
Assert (ts != NULL );
907
910
//if (x->gid[0]) MTM_LOG1("Preparing transaction %d (%s) at %ld", x->xid, x->gid, MtmGetCurrentTime());
908
911
if (!MtmIsCoordinator (ts ) || Mtm -> status == MTM_RECOVERY ) {
909
- bool found ;
910
- MtmTransMap * tm = (MtmTransMap * )hash_search (MtmGid2State , x -> gid , HASH_ENTER , & found );
912
+ MtmTransMap * tm = (MtmTransMap * )hash_search (MtmGid2State , x -> gid , HASH_ENTER , NULL );
911
913
Assert (x -> gid [0 ]);
912
914
tm -> state = ts ;
913
915
ts -> votingCompleted = true;
@@ -925,8 +927,13 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
925
927
time_t transTimeout = Max (MSEC_TO_USEC (Mtm2PCMinTimeout ), (ts -> csn - ts -> snapshot )* Mtm2PCPrepareRatio /100 );
926
928
int result = 0 ;
927
929
int nConfigChanges = Mtm -> nConfigChanges ;
928
-
929
930
timestamp_t start = MtmGetSystemTime ();
931
+
932
+ if (x -> isTwoPhase ) {
933
+ MtmTransMap * tm = (MtmTransMap * )hash_search (MtmGid2State , x -> gid , HASH_ENTER , NULL );
934
+ tm -> state = ts ;
935
+ }
936
+
930
937
/* Wait votes from all nodes until: */
931
938
while (!ts -> votingCompleted /* all nodes voted */
932
939
&& nConfigChanges == Mtm -> nConfigChanges /* configarion is changed */
@@ -982,7 +989,7 @@ MtmAbortPreparedTransaction(MtmCurrentTrans* x)
982
989
MtmLock (LW_EXCLUSIVE );
983
990
tm = (MtmTransMap * )hash_search (MtmGid2State , x -> gid , HASH_REMOVE , NULL );
984
991
if (tm == NULL ) {
985
- elog (WARNING , "Global transaciton ID %s is not found" , x -> gid );
992
+ elog (WARNING , "Global transaciton ID '%s' is not found" , x -> gid );
986
993
} else {
987
994
Assert (tm -> state != NULL );
988
995
MTM_LOG1 ("Abort prepared transaction %d with gid='%s'" , x -> xid , x -> gid );
@@ -1265,7 +1272,7 @@ void MtmAbortTransaction(MtmTransState* ts)
1265
1272
Assert (MtmLockCount != 0 ); /* should be invoked with exclsuive lock */
1266
1273
if (ts -> status != TRANSACTION_STATUS_ABORTED ) {
1267
1274
if (ts -> status == TRANSACTION_STATUS_COMMITTED ) {
1268
- elog (WARNING , "Attempt to rollback already committed transaction %d (%s)" , ts -> xid , ts -> gid );
1275
+ elog (LOG , "Attempt to rollback already committed transaction %d (%s)" , ts -> xid , ts -> gid );
1269
1276
} else {
1270
1277
MTM_LOG1 ("Rollback active transaction %d:%d (local xid %d) status %d" , ts -> gtid .node , ts -> gtid .xid , ts -> xid , ts -> status );
1271
1278
ts -> status = TRANSACTION_STATUS_ABORTED ;
@@ -3803,11 +3810,10 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
3803
3810
}
3804
3811
break ;
3805
3812
case TRANS_STMT_PREPARE :
3806
- //elog(ERROR, "Two phase commit is not supported by multimaster");
3807
- break ;
3808
3813
case TRANS_STMT_COMMIT_PREPARED :
3809
3814
case TRANS_STMT_ROLLBACK_PREPARED :
3810
- skipCommand = true;
3815
+ MtmTx .isTwoPhase = true;
3816
+ strcpy (MtmTx .gid , stmt -> gid );
3811
3817
break ;
3812
3818
default :
3813
3819
break ;
@@ -3940,7 +3946,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
3940
3946
params , dest , completionTag );
3941
3947
}
3942
3948
3943
- if (MtmTx .isDistributed && XactIsoLevel != XACT_REPEATABLE_READ ) {
3949
+ if (! MtmVolksWagenMode && MtmTx .isDistributed && XactIsoLevel != XACT_REPEATABLE_READ ) {
3944
3950
elog (ERROR , "Isolation level %s is not supported by multimaster" , isoLevelStr [XactIsoLevel ]);
3945
3951
}
3946
3952
@@ -4126,7 +4132,7 @@ MtmDetectGlobalDeadLockFortXid(TransactionId xid)
4126
4132
}
4127
4133
MtmGetGtid (xid , & gtid );
4128
4134
hasDeadlock = MtmGraphFindLoop (& graph , & gtid );
4129
- elog (WARNING , "Distributed deadlock check by backend %d for %u:%u = %d" , MyProcPid , gtid .node , gtid .xid , hasDeadlock );
4135
+ elog (LOG , "Distributed deadlock check by backend %d for %u:%u = %d" , MyProcPid , gtid .node , gtid .xid , hasDeadlock );
4130
4136
if (!hasDeadlock ) {
4131
4137
/* There is no deadlock loop in graph, but deadlock can be caused by lack of apply workers: if all of them are busy, then some transactions
4132
4138
* can not be appied just because there are no vacant workers and it cause additional dependency between transactions which is not
0 commit comments