Skip to content

Commit 8072031

Browse files
committed
Support users 2PC
1 parent 9958c15 commit 8072031

File tree

4 files changed

+95
-13
lines changed

4 files changed

+95
-13
lines changed

contrib/mmts/arbiter.c

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -997,15 +997,18 @@ static void MtmReceiver(Datum arg)
997997
/* All nodes are finished their transactions */
998998
if (ts->status == TRANSACTION_STATUS_ABORTED) {
999999
MtmWakeUpBackend(ts);
1000-
} else if (MtmUseDtm) {
1001-
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
1002-
ts->nVotes = 1; /* I voted myself */
1003-
MTM_TXTRACE(ts, "MtmTransReceiver send MSG_PREPARE");
1004-
MtmSend2PCMessage(ts, MSG_PREPARE);
10051000
} else {
10061001
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
1007-
ts->status = TRANSACTION_STATUS_UNKNOWN;
1008-
MtmWakeUpBackend(ts);
1002+
if (ts->isTwoPhase) {
1003+
MtmWakeUpBackend(ts);
1004+
} else if (MtmUseDtm) {
1005+
ts->nVotes = 1; /* I voted myself */
1006+
MTM_TXTRACE(ts, "MtmTransReceiver send MSG_PREPARE");
1007+
MtmSend2PCMessage(ts, MSG_PREPARE);
1008+
} else {
1009+
ts->status = TRANSACTION_STATUS_UNKNOWN;
1010+
MtmWakeUpBackend(ts);
1011+
}
10091012
}
10101013
}
10111014
}

contrib/mmts/multimaster.c

Lines changed: 84 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ static void MtmBeginTransaction(MtmCurrentTrans* x);
130130
static void MtmPrePrepareTransaction(MtmCurrentTrans* x);
131131
static void MtmPostPrepareTransaction(MtmCurrentTrans* x);
132132
static void MtmAbortPreparedTransaction(MtmCurrentTrans* x);
133+
static void MtmCommitPreparedTransaction(MtmCurrentTrans* x);
133134
static void MtmEndTransaction(MtmCurrentTrans* x, bool commit);
134135
static bool MtmTwoPhaseCommit(MtmCurrentTrans* x);
135136
static TransactionId MtmGetOldestXmin(Relation rel, bool ignoreVacuum);
@@ -685,6 +686,9 @@ MtmXactCallback(XactEvent event, void *arg)
685686
case XACT_EVENT_ABORT_PREPARED:
686687
MtmAbortPreparedTransaction(&MtmTx);
687688
break;
689+
case XACT_EVENT_COMMIT_PREPARED:
690+
MtmCommitPreparedTransaction(&MtmTx);
691+
break;
688692
case XACT_EVENT_COMMIT:
689693
MtmEndTransaction(&MtmTx, true);
690694
break;
@@ -793,6 +797,7 @@ MtmCreateTransState(MtmCurrentTrans* x)
793797
ts->status = TRANSACTION_STATUS_IN_PROGRESS;
794798
ts->snapshot = x->snapshot;
795799
ts->isLocal = true;
800+
ts->isTwoPhase = x->isTwoPhase;
796801
if (!found) {
797802
ts->isEnqueued = false;
798803
ts->isActive = false;
@@ -970,6 +975,9 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
970975
x->status = ts->status;
971976
MTM_LOG3("%d: Result of vote: %d", MyProcPid, ts->status);
972977
MtmUnlock();
978+
if (x->isTwoPhase) {
979+
MtmResetTransaction();
980+
}
973981
}
974982
//if (x->gid[0]) MTM_LOG1("Prepared transaction %d (%s) csn=%ld at %ld: %d", x->xid, x->gid, ts->csn, MtmGetCurrentTime(), ts->status);
975983
if (Mtm->inject2PCError == 3) {
@@ -980,6 +988,74 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
980988
MTM_TXTRACE(x, "PostPrepareTransaction Finish");
981989
}
982990

991+
static void
992+
MtmCommitPreparedTransaction(MtmCurrentTrans* x)
993+
{
994+
MtmTransMap* tm;
995+
MtmTransState* ts;
996+
997+
if (Mtm->status == MTM_RECOVERY || x->isReplicated || x->isPrepared) { /* Ignore auto-2PC originated by multimaster */
998+
return;
999+
}
1000+
MtmLock(LW_EXCLUSIVE);
1001+
tm = (MtmTransMap*)hash_search(MtmGid2State, x->gid, HASH_FIND, NULL);
1002+
if (tm == NULL) {
1003+
elog(WARNING, "Global transaciton ID '%s' is not found", x->gid);
1004+
} else {
1005+
time_t transTimeout = MSEC_TO_USEC(Mtm2PCMinTimeout);
1006+
int nConfigChanges = Mtm->nConfigChanges;
1007+
timestamp_t start = MtmGetSystemTime();
1008+
int result = 0;
1009+
1010+
Assert(tm->state != NULL);
1011+
MTM_LOG1("Commit prepared transaction %d with gid='%s'", x->xid, x->gid);
1012+
ts = tm->state;
1013+
1014+
Assert(MtmIsCoordinator(ts));
1015+
1016+
ts->votingCompleted = false;
1017+
ts->nVotes = 1; /* I voted myself */
1018+
ts->procno = MyProc->pgprocno;
1019+
MTM_TXTRACE(ts, "Coordinator sends MSG_PREPARE");
1020+
MtmSend2PCMessage(ts, MSG_PREPARE);
1021+
1022+
/* Wait votes from all nodes until: */
1023+
while (!ts->votingCompleted /* all nodes voted */
1024+
&& nConfigChanges == Mtm->nConfigChanges /* configarion is changed */
1025+
&& Mtm->status == MTM_ONLINE /* node is not online */
1026+
&& ts->status != TRANSACTION_STATUS_ABORTED /* transaction is aborted */
1027+
&& start + transTimeout >= MtmGetSystemTime()) /* timeout is expired */
1028+
{
1029+
MtmUnlock();
1030+
MTM_TXTRACE(x, "CommitPreparedTransaction WaitLatch Start");
1031+
result = WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, MtmHeartbeatRecvTimeout);
1032+
MTM_TXTRACE(x, "CommitPreparedTransaction WaitLatch Finish");
1033+
/* Emergency bailout if postmaster has died */
1034+
if (result & WL_POSTMASTER_DEATH) {
1035+
proc_exit(1);
1036+
}
1037+
if (result & WL_LATCH_SET) {
1038+
MTM_LOG3("Latch signaled at %ld", MtmGetSystemTime());
1039+
ResetLatch(&MyProc->procLatch);
1040+
}
1041+
MtmLock(LW_EXCLUSIVE);
1042+
}
1043+
if (ts->status != TRANSACTION_STATUS_ABORTED && (!ts->votingCompleted || nConfigChanges != Mtm->nConfigChanges)) {
1044+
if (nConfigChanges != Mtm->nConfigChanges) {
1045+
elog(WARNING, "Transaction %d (%s) is aborted because cluster configuration is changed during commit", x->xid, x->gid);
1046+
} else {
1047+
elog(WARNING, "Transaction %d (%s) is aborted because of %d msec timeout expiration, prepare time %d msec",
1048+
x->xid, x->gid, (int)USEC_TO_MSEC(transTimeout), (int)USEC_TO_MSEC(ts->csn - x->snapshot));
1049+
}
1050+
MtmAbortTransaction(ts);
1051+
}
1052+
x->status = ts->status;
1053+
x->xid = ts->xid;
1054+
x->isPrepared = true;
1055+
MTM_LOG3("%d: Result of vote: %d", MyProcPid, ts->status);
1056+
}
1057+
MtmUnlock();
1058+
}
9831059

9841060
static void
9851061
MtmAbortPreparedTransaction(MtmCurrentTrans* x)
@@ -1009,9 +1085,9 @@ MtmAbortPreparedTransaction(MtmCurrentTrans* x)
10091085
static void
10101086
MtmEndTransaction(MtmCurrentTrans* x, bool commit)
10111087
{
1012-
MTM_LOG3("%d: End transaction %d, prepared=%d, replicated=%d, distributed=%d, gid=%s -> %s",
1013-
MyProcPid, x->xid, x->isPrepared, x->isReplicated, x->isDistributed, x->gid, commit ? "commit" : "abort");
1014-
if (x->status != TRANSACTION_STATUS_ABORTED && x->isDistributed && (x->isPrepared || x->isReplicated)) {
1088+
MTM_LOG1("%d: End transaction %d, prepared=%d, replicated=%d, distributed=%d, 2pc=%d, gid=%s -> %s",
1089+
MyProcPid, x->xid, x->isPrepared, x->isReplicated, x->isDistributed, x->isTwoPhase, x->gid, commit ? "commit" : "abort");
1090+
if (x->status != TRANSACTION_STATUS_ABORTED && x->isDistributed && (x->isPrepared || x->isReplicated) && !x->isTwoPhase) {
10151091
MtmTransState* ts = NULL;
10161092
MtmLock(LW_EXCLUSIVE);
10171093
if (x->isPrepared) {
@@ -3820,9 +3896,13 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
38203896
}
38213897
break;
38223898
case TRANS_STMT_PREPARE:
3899+
MtmTx.isTwoPhase = true;
3900+
strcpy(MtmTx.gid, stmt->gid);
3901+
break;
3902+
/* nobreak */
38233903
case TRANS_STMT_COMMIT_PREPARED:
38243904
case TRANS_STMT_ROLLBACK_PREPARED:
3825-
MtmTx.isTwoPhase = true;
3905+
Assert(!MtmTx.isTwoPhase);
38263906
strcpy(MtmTx.gid, stmt->gid);
38273907
break;
38283908
default:

contrib/mmts/multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ typedef struct MtmTransState
213213
bool isLocal; /* Transaction is either replicated, either doesn't contain DML statements, so it shoudl be ignored by pglogical replication */
214214
bool isEnqueued; /* Transaction is inserted in queue */
215215
bool isActive; /* Transaction is active */
216+
bool isTwoPhase; /* user level 2PC */
216217
nodemask_t participantsMask; /* Mask of nodes involved in transaction */
217218
nodemask_t votedMask; /* Mask of voted nodes */
218219
TransactionId xids[1]; /* [Mtm->nAllNodes]: transaction ID at replicas */

contrib/mmts/pglogical_proto.c

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,6 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
186186
else
187187
Assert(false);
188188

189-
Assert(flags != PGLOGICAL_COMMIT_PREPARED || txn->xid < 1000 || MtmTransactionRecords != 1);
190-
191189
if (flags == PGLOGICAL_COMMIT || flags == PGLOGICAL_PREPARE) {
192190
// if (MtmIsFilteredTxn) {
193191
// Assert(MtmTransactionRecords == 0);

0 commit comments

Comments
 (0)