@@ -64,6 +64,7 @@ typedef struct {
64
64
bool isReplicated ; /* transaction on replica */
65
65
bool isDistributed ; /* transaction performed INSERT/UPDATE/DELETE and has to be replicated to other nodes */
66
66
bool isPrepared ; /* transaction is perpared at first stage of 2PC */
67
+ bool isTransactionBlock ; /* is transaction block */
67
68
bool containsDML ; /* transaction contains DML statements */
68
69
XidStatus status ; /* transaction status */
69
70
csn_t snapshot ; /* transaction snaphsot */
@@ -111,6 +112,7 @@ static void MtmPrePrepareTransaction(MtmCurrentTrans* x);
111
112
static void MtmPostPrepareTransaction (MtmCurrentTrans * x );
112
113
static void MtmAbortPreparedTransaction (MtmCurrentTrans * x );
113
114
static void MtmEndTransaction (MtmCurrentTrans * x , bool commit );
115
+ static bool MtmTwoPhaseCommit (MtmCurrentTrans * x );
114
116
static TransactionId MtmGetOldestXmin (Relation rel , bool ignoreVacuum );
115
117
static bool MtmXidInMVCCSnapshot (TransactionId xid , Snapshot snapshot );
116
118
static TransactionId MtmAdjustOldestXid (TransactionId xid );
@@ -588,6 +590,11 @@ MtmXactCallback(XactEvent event, void *arg)
588
590
case XACT_EVENT_ABORT :
589
591
MtmEndTransaction (& MtmTx , false);
590
592
break ;
593
+ case XACT_EVENT_COMMIT_COMMAND :
594
+ if (!MtmTx .isTransactionBlock ) {
595
+ MtmTwoPhaseCommit (& MtmTx );
596
+ }
597
+ break ;
591
598
default :
592
599
break ;
593
600
}
@@ -623,6 +630,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
623
630
x -> isReplicated = false;
624
631
x -> isDistributed = MtmIsUserTransaction ();
625
632
x -> isPrepared = false;
633
+ x -> isTransactionBlock = IsTransactionBlock ();
626
634
if (x -> isDistributed && Mtm -> status != MTM_ONLINE ) {
627
635
/* reject all user's transactions at offline cluster */
628
636
MtmUnlock ();
@@ -1922,33 +1930,34 @@ MtmGenerateGid(char* gid)
1922
1930
sprintf (gid , "MTM-%d-%d-%d" , MtmNodeId , MyProcPid , ++ localCount );
1923
1931
}
1924
1932
1925
- static void MtmTwoPhaseCommit (char * completionTag )
1933
+ static bool MtmTwoPhaseCommit (MtmCurrentTrans * x )
1926
1934
{
1927
- MtmGenerateGid (MtmTx .gid );
1928
- if (!IsTransactionBlock ()) {
1929
- elog (WARNING , "Start transaction block for %d" , MtmTx .xid );
1930
- BeginTransactionBlock ();
1931
- CommitTransactionCommand ();
1932
- StartTransactionCommand ();
1933
- }
1934
- if (!PrepareTransactionBlock (MtmTx .gid ))
1935
- {
1936
- elog (WARNING , "Failed to prepare transaction %s" , MtmTx .gid );
1937
- /* report unsuccessful commit in completionTag */
1938
- if (completionTag ) {
1939
- strcpy (completionTag , "ROLLBACK" );
1935
+ if (!x -> isReplicated && (x -> isDistributed && x -> containsDML )) {
1936
+ MtmGenerateGid (x -> gid );
1937
+ if (!x -> isTransactionBlock ) {
1938
+ elog (WARNING , "Start transaction block for %s" , x -> gid );
1939
+ BeginTransactionBlock ();
1940
+ x -> isTransactionBlock = true;
1941
+ CommitTransactionCommand ();
1942
+ StartTransactionCommand ();
1940
1943
}
1941
- /* ??? Should we do explicit rollback */
1942
- } else {
1943
- CommitTransactionCommand ();
1944
- StartTransactionCommand ();
1945
- if (MtmGetCurrentTransactionStatus () == TRANSACTION_STATUS_ABORTED ) {
1946
- FinishPreparedTransaction (MtmTx .gid , false);
1947
- elog (ERROR , "Transaction %s is aborted by DTM" , MtmTx .gid );
1948
- } else {
1949
- FinishPreparedTransaction (MtmTx .gid , true);
1944
+ if (!PrepareTransactionBlock (x -> gid ))
1945
+ {
1946
+ elog (WARNING , "Failed to prepare transaction %s" , x -> gid );
1947
+ /* ??? Should we do explicit rollback */
1948
+ } else {
1949
+ CommitTransactionCommand ();
1950
+ StartTransactionCommand ();
1951
+ if (MtmGetCurrentTransactionStatus () == TRANSACTION_STATUS_ABORTED ) {
1952
+ FinishPreparedTransaction (x -> gid , false);
1953
+ elog (ERROR , "Transaction %s is aborted by DTM" , x -> gid );
1954
+ } else {
1955
+ FinishPreparedTransaction (x -> gid , true);
1956
+ }
1950
1957
}
1958
+ return true;
1951
1959
}
1960
+ return false;
1952
1961
}
1953
1962
1954
1963
static void MtmProcessUtility (Node * parsetree , const char * queryString ,
@@ -1964,9 +1973,11 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
1964
1973
TransactionStmt * stmt = (TransactionStmt * ) parsetree ;
1965
1974
switch (stmt -> kind )
1966
1975
{
1976
+ case TRANS_STMT_BEGIN :
1977
+ MtmTx .isTransactionBlock = true;
1978
+ break ;
1967
1979
case TRANS_STMT_COMMIT :
1968
- if (MtmTx .isDistributed && MtmTx .containsDML ) {
1969
- MtmTwoPhaseCommit (completionTag );
1980
+ if (MtmTwoPhaseCommit (& MtmTx )) {
1970
1981
return ;
1971
1982
}
1972
1983
break ;
@@ -2002,9 +2013,6 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
2002
2013
if (MtmProcessDDLCommand (queryString )) {
2003
2014
return ;
2004
2015
}
2005
- if (MtmTx .isDistributed && MtmTx .containsDML && !IsTransactionBlock ()) {
2006
- MtmTwoPhaseCommit (completionTag );
2007
- }
2008
2016
}
2009
2017
if (PreviousProcessUtilityHook != NULL )
2010
2018
{
@@ -2034,9 +2042,6 @@ MtmExecutorFinish(QueryDesc *queryDesc)
2034
2042
}
2035
2043
}
2036
2044
}
2037
- if (MtmTx .isDistributed && MtmTx .containsDML && !IsTransactionBlock ()) {
2038
- MtmTwoPhaseCommit (NULL );
2039
- }
2040
2045
}
2041
2046
if (PreviousExecutorFinishHook != NULL )
2042
2047
{
0 commit comments