Skip to content

Commit a9c1f86

Browse files
committed
Add GetPreparedTransactionState function
1 parent b094bb5 commit a9c1f86

File tree

4 files changed

+34
-6
lines changed

4 files changed

+34
-6
lines changed

contrib/mmts/multimaster--1.0.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ CREATE FUNCTION mtm.get_snapshot() RETURNS bigint
2727
AS 'MODULE_PATHNAME','mtm_get_snapshot'
2828
LANGUAGE C;
2929

30-
CREATE FUNCTION mtm.get_csn(tid xid) RETURNS bigint
30+
CREATE FUNCTION mtm.get_csn(integer xid) RETURNS bigint
3131
AS 'MODULE_PATHNAME','mtm_get_csn'
3232
LANGUAGE C;
3333

@@ -51,7 +51,7 @@ CREATE FUNCTION mtm.get_trans_by_gid(git text) RETURNS mtm.trans_state
5151
AS 'MODULE_PATHNAME','mtm_get_trans_by_gid'
5252
LANGUAGE C;
5353

54-
CREATE FUNCTION mtm.get_trans_by_xid(tid xid) RETURNS mtm.trans_state
54+
CREATE FUNCTION mtm.get_trans_by_xid(integer xid) RETURNS mtm.trans_state
5555
AS 'MODULE_PATHNAME','mtm_get_trans_by_xid'
5656
LANGUAGE C;
5757

contrib/mmts/multimaster.c

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1199,11 +1199,13 @@ static void
11991199
MtmLogAbortLogicalMessage(int nodeId, char const* gid)
12001200
{
12011201
MtmAbortLogicalMessage msg;
1202+
XLogRecPtr lsn;
12021203
strcpy(msg.gid, gid);
12031204
msg.origin_node = nodeId;
12041205
msg.origin_lsn = replorigin_session_origin_lsn;
1205-
MTM_LOG2("[TRACE] MtmLogAbortLogicalMessage(%d, %s)", nodeId, gid);
1206-
XLogFlush(LogLogicalMessage("A", (char*)&msg, sizeof msg, false));
1206+
lsn = LogLogicalMessage("A", (char*)&msg, sizeof msg, false);
1207+
XLogFlush(lsn);
1208+
MTM_LOG1("MtmLogAbortLogicalMessage node=%d transaction=%s lsn=%lx", nodeId, gid, lsn);
12071209
}
12081210

12091211
static void
@@ -1260,7 +1262,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
12601262
* Send notification only if ABORT happens during transaction processing at replicas,
12611263
* do not send notification if ABORT is received from master
12621264
*/
1263-
MTM_LOG1("%d: send ABORT notification for transaction %d to coordinator %d", MyProcPid, x->gtid.xid, x->gtid.node);
1265+
MTM_LOG1("%d: send ABORT notification for transaction %d (%s) to coordinator %d", MyProcPid, x->gtid.xid, x->gid, x->gtid.node);
12641266
if (ts == NULL) {
12651267
bool found;
12661268
Assert(TransactionIdIsValid(x->xid));
@@ -1408,7 +1410,7 @@ static void MtmLoadPreparedTransactions(void)
14081410
bool found;
14091411
char const* gid = pxacts[i].gid;
14101412
MtmTransMap* tm = (MtmTransMap*)hash_search(MtmGid2State, gid, HASH_ENTER, &found);
1411-
if (!found) {
1413+
if (!found || tm->state == NULL) {
14121414
TransactionId xid = GetNewTransactionId(false);
14131415
MtmTransState* ts = (MtmTransState*)hash_search(MtmXid2State, &xid, HASH_ENTER, &found);
14141416
MTM_LOG1("Recover prepared transaction %s xid=%d state=%s", gid, xid, pxacts[i].state_3pc);
@@ -1532,9 +1534,11 @@ XidStatus MtmExchangeGlobalTransactionStatus(char const* gid, XidStatus new_stat
15321534
}
15331535
if (tm->state != NULL && old_status == TRANSACTION_STATUS_IN_PROGRESS) {
15341536
/* Return UNKNOWN to mark that transaction was prepared */
1537+
MTM_LOG1("Change status of in-progress transaction %s to %s", gid, MtmTxtStatusMnem[new_status]);
15351538
old_status = TRANSACTION_STATUS_UNKNOWN;
15361539
}
15371540
} else {
1541+
MTM_LOG1("Set status of unknown transaction %s to %s", gid, MtmTxtStatusMnem[new_status]);
15381542
tm->state = NULL;
15391543
tm->status = new_status;
15401544
}
@@ -2996,6 +3000,8 @@ void MtmRollbackPreparedTransaction(int nodeId, char const* gid)
29963000
CommitTransactionCommand();
29973001
MtmEndSession(nodeId, true);
29983002
} else if (status == TRANSACTION_STATUS_IN_PROGRESS) {
3003+
char state3pc[MAX_3PC_STATE_SIZE];
3004+
Assert(!GetPreparedTransactionState(gid, state3pc));
29993005
MtmBeginSession(nodeId);
30003006
MtmLogAbortLogicalMessage(nodeId, gid);
30013007
MtmEndSession(nodeId, true);

src/backend/access/transam/twophase.c

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -789,6 +789,26 @@ GetPreparedTransactionList(GlobalTransaction *gxacts)
789789
return num;
790790
}
791791

792+
bool GetPreparedTransactionState(char const* gid, char* state)
793+
{
794+
int i;
795+
GlobalTransaction gxact;
796+
797+
LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
798+
i = string_hash(gid, 0) % max_prepared_xacts;
799+
for (gxact = TwoPhaseState->hashTable[i]; gxact != NULL; gxact = gxact->next)
800+
{
801+
if (strcmp(gxact->gid, gid) == 0)
802+
{
803+
strcpy(state, gxact->state_3pc);
804+
return true;
805+
}
806+
}
807+
LWLockRelease(TwoPhaseStateLock);
808+
return false;
809+
}
810+
811+
792812
/*
793813
* SetPrepareTransactionState
794814
* Alter 3PC state of prepared transaction

src/include/access/twophase.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,4 +75,6 @@ extern int GetPreparedTransactions(PreparedTransaction* pxacts);
7575

7676
extern void SetPreparedTransactionState(char const* gid, char const* state);
7777

78+
extern bool GetPreparedTransactionState(char const* gid, char* state);
79+
7880
#endif /* TWOPHASE_H */

0 commit comments

Comments
 (0)