Skip to content

Commit 291226a

Browse files
committed
Merge branch 'commit_hook'
2 parents 4054f6b + eb21867 commit 291226a

File tree

7 files changed

+51
-39
lines changed

7 files changed

+51
-39
lines changed

contrib/mmts/multimaster.c

Lines changed: 36 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ typedef struct {
6464
bool isReplicated; /* transaction on replica */
6565
bool isDistributed; /* transaction performed INSERT/UPDATE/DELETE and has to be replicated to other nodes */
6666
bool isPrepared; /* transaction is perpared at first stage of 2PC */
67+
bool isTransactionBlock; /* is transaction block */
6768
bool containsDML; /* transaction contains DML statements */
6869
XidStatus status; /* transaction status */
6970
csn_t snapshot; /* transaction snaphsot */
@@ -111,6 +112,7 @@ static void MtmPrePrepareTransaction(MtmCurrentTrans* x);
111112
static void MtmPostPrepareTransaction(MtmCurrentTrans* x);
112113
static void MtmAbortPreparedTransaction(MtmCurrentTrans* x);
113114
static void MtmEndTransaction(MtmCurrentTrans* x, bool commit);
115+
static bool MtmTwoPhaseCommit(MtmCurrentTrans* x);
114116
static TransactionId MtmGetOldestXmin(Relation rel, bool ignoreVacuum);
115117
static bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot);
116118
static TransactionId MtmAdjustOldestXid(TransactionId xid);
@@ -588,6 +590,11 @@ MtmXactCallback(XactEvent event, void *arg)
588590
case XACT_EVENT_ABORT:
589591
MtmEndTransaction(&MtmTx, false);
590592
break;
593+
case XACT_EVENT_COMMIT_COMMAND:
594+
if (!MtmTx.isTransactionBlock) {
595+
MtmTwoPhaseCommit(&MtmTx);
596+
}
597+
break;
591598
default:
592599
break;
593600
}
@@ -623,6 +630,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
623630
x->isReplicated = false;
624631
x->isDistributed = MtmIsUserTransaction();
625632
x->isPrepared = false;
633+
x->isTransactionBlock = IsTransactionBlock();
626634
if (x->isDistributed && Mtm->status != MTM_ONLINE) {
627635
/* reject all user's transactions at offline cluster */
628636
MtmUnlock();
@@ -1922,33 +1930,34 @@ MtmGenerateGid(char* gid)
19221930
sprintf(gid, "MTM-%d-%d-%d", MtmNodeId, MyProcPid, ++localCount);
19231931
}
19241932

1925-
static void MtmTwoPhaseCommit(char *completionTag)
1933+
static bool MtmTwoPhaseCommit(MtmCurrentTrans* x)
19261934
{
1927-
MtmGenerateGid(MtmTx.gid);
1928-
if (!IsTransactionBlock()) {
1929-
elog(WARNING, "Start transaction block for %d", MtmTx.xid);
1930-
BeginTransactionBlock();
1931-
CommitTransactionCommand();
1932-
StartTransactionCommand();
1933-
}
1934-
if (!PrepareTransactionBlock(MtmTx.gid))
1935-
{
1936-
elog(WARNING, "Failed to prepare transaction %s", MtmTx.gid);
1937-
/* report unsuccessful commit in completionTag */
1938-
if (completionTag) {
1939-
strcpy(completionTag, "ROLLBACK");
1935+
if (!x->isReplicated && (x->isDistributed && x->containsDML)) {
1936+
MtmGenerateGid(x->gid);
1937+
if (!x->isTransactionBlock) {
1938+
elog(WARNING, "Start transaction block for %s", x->gid);
1939+
BeginTransactionBlock();
1940+
x->isTransactionBlock = true;
1941+
CommitTransactionCommand();
1942+
StartTransactionCommand();
19401943
}
1941-
/* ??? Should we do explicit rollback */
1942-
} else {
1943-
CommitTransactionCommand();
1944-
StartTransactionCommand();
1945-
if (MtmGetCurrentTransactionStatus() == TRANSACTION_STATUS_ABORTED) {
1946-
FinishPreparedTransaction(MtmTx.gid, false);
1947-
elog(ERROR, "Transaction %s is aborted by DTM", MtmTx.gid);
1948-
} else {
1949-
FinishPreparedTransaction(MtmTx.gid, true);
1944+
if (!PrepareTransactionBlock(x->gid))
1945+
{
1946+
elog(WARNING, "Failed to prepare transaction %s", x->gid);
1947+
/* ??? Should we do explicit rollback */
1948+
} else {
1949+
CommitTransactionCommand();
1950+
StartTransactionCommand();
1951+
if (MtmGetCurrentTransactionStatus() == TRANSACTION_STATUS_ABORTED) {
1952+
FinishPreparedTransaction(x->gid, false);
1953+
elog(ERROR, "Transaction %s is aborted by DTM", x->gid);
1954+
} else {
1955+
FinishPreparedTransaction(x->gid, true);
1956+
}
19501957
}
1958+
return true;
19511959
}
1960+
return false;
19521961
}
19531962

19541963
static void MtmProcessUtility(Node *parsetree, const char *queryString,
@@ -1964,9 +1973,11 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
19641973
TransactionStmt *stmt = (TransactionStmt *) parsetree;
19651974
switch (stmt->kind)
19661975
{
1976+
case TRANS_STMT_BEGIN:
1977+
MtmTx.isTransactionBlock = true;
1978+
break;
19671979
case TRANS_STMT_COMMIT:
1968-
if (MtmTx.isDistributed && MtmTx.containsDML) {
1969-
MtmTwoPhaseCommit(completionTag);
1980+
if (MtmTwoPhaseCommit(&MtmTx)) {
19701981
return;
19711982
}
19721983
break;
@@ -2002,9 +2013,6 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
20022013
if (MtmProcessDDLCommand(queryString)) {
20032014
return;
20042015
}
2005-
if (MtmTx.isDistributed && MtmTx.containsDML && !IsTransactionBlock()) {
2006-
MtmTwoPhaseCommit(completionTag);
2007-
}
20082016
}
20092017
if (PreviousProcessUtilityHook != NULL)
20102018
{
@@ -2034,9 +2042,6 @@ MtmExecutorFinish(QueryDesc *queryDesc)
20342042
}
20352043
}
20362044
}
2037-
if (MtmTx.isDistributed && MtmTx.containsDML && !IsTransactionBlock()) {
2038-
MtmTwoPhaseCommit(NULL);
2039-
}
20402045
}
20412046
if (PreviousExecutorFinishHook != NULL)
20422047
{

contrib/mmts/pglogical_apply.c

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -480,13 +480,15 @@ MtmBeginSession(void)
480480
}
481481

482482
static void
483-
MtmEndSession(void)
483+
MtmEndSession(bool unlock)
484484
{
485485
if (replorigin_session_origin != InvalidRepOriginId) {
486486
MTM_TRACE("%d: Begin reset replorigin session: %d\n", MyProcPid, replorigin_session_origin);
487487
replorigin_session_origin = InvalidRepOriginId;
488488
replorigin_session_reset();
489-
MtmUnlockNode(MtmReplicationNode);
489+
if (unlock) {
490+
MtmUnlockNode(MtmReplicationNode);
491+
}
490492
MTM_TRACE("%d: End reset replorigin session: %d\n", MyProcPid, replorigin_session_origin);
491493
}
492494
}
@@ -568,7 +570,7 @@ process_remote_commit(StringInfo in)
568570
default:
569571
Assert(false);
570572
}
571-
MtmEndSession();
573+
MtmEndSession(true);
572574
}
573575

574576
static void
@@ -935,7 +937,7 @@ void MtmExecutor(int id, void* work, size_t size)
935937
EmitErrorReport();
936938
FlushErrorState();
937939
MTM_TRACE("%d: REMOTE begin abort transaction %d\n", MyProcPid, MtmGetCurrentTransactionId());
938-
MtmEndSession();
940+
MtmEndSession(false);
939941
AbortCurrentTransaction();
940942
MTM_TRACE("%d: REMOTE end abort transaction %d\n", MyProcPid, MtmGetCurrentTransactionId());
941943
}

contrib/mmts/tests/dtmbench.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ void* reader(void* arg)
129129
result r = txn.exec("select sum(v) from t");
130130
int64_t sum = r[0][0].as(int64_t());
131131
if (sum != prevSum) {
132-
// r = txn.exec("select mtm_get_snapshot()");
132+
r = txn.exec("select mtm_get_snapshot()");
133133
printf("Total=%ld, snapshot=%ld\n", sum, r[0][0].as(int64_t()));
134134
prevSum = sum;
135135
}

src/backend/access/transam/twophase.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1248,7 +1248,7 @@ ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
12481248

12491249
hdr = (TwoPhaseFileHeader *) xlrec;
12501250
bufptr = xlrec + MAXALIGN(sizeof(TwoPhaseFileHeader));
1251-
1251+
12521252
strncpy(parsed->twophase_gid, bufptr, hdr->gidlen);
12531253
bufptr += MAXALIGN(hdr->gidlen);
12541254

src/backend/access/transam/xact.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2728,6 +2728,8 @@ CommitTransactionCommand(void)
27282728
{
27292729
TransactionState s = CurrentTransactionState;
27302730

2731+
CallXactCallbacks(XACT_EVENT_COMMIT_COMMAND);
2732+
27312733
switch (s->blockState)
27322734
{
27332735
/*

src/backend/utils/cache/inval.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -603,8 +603,10 @@ LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
603603
else if (msg->rm.dbId == MyDatabaseId)
604604
InvalidateCatalogSnapshot();
605605
}
606-
else
606+
else {
607+
*(int*) 0 = 0;
607608
elog(FATAL, "unrecognized SI message ID: %d", msg->id);
609+
}
608610
}
609611

610612
/*

src/include/access/xact.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ typedef enum
9292
XACT_EVENT_PRE_PREPARE,
9393
XACT_EVENT_POST_PREPARE,
9494
XACT_EVENT_COMMIT_PREPARED,
95-
XACT_EVENT_ABORT_PREPARED
95+
XACT_EVENT_ABORT_PREPARED,
96+
XACT_EVENT_COMMIT_COMMAND
9697
} XactEvent;
9798

9899
typedef void (*XactCallback) (XactEvent event, void *arg);

0 commit comments

Comments
 (0)