Skip to content

Commit d9e190e

Browse files
committed
Support subtransactions
1 parent fc142ba commit d9e190e

File tree

1 file changed

+36
-9
lines changed

1 file changed

+36
-9
lines changed

pg_dtm.c

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,9 @@ static Snapshot DtmGetSnapshot(Snapshot snapshot);
8383
static TransactionId DtmGetOldestXmin(Relation rel, bool ignoreVacuum);
8484
static bool DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot);
8585
static TransactionId DtmAdjustOldestXid(TransactionId xid);
86+
static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn);
8687

87-
static TransactionManager DtmTM = { PgTransactionIdGetStatus, PgTransactionIdSetTreeStatus, DtmGetSnapshot, PgGetNewTransactionId, DtmGetOldestXmin, PgTransactionIdIsInProgress, PgGetGlobalTransactionId, DtmXidInMVCCSnapshot };
88+
static TransactionManager DtmTM = { PgTransactionIdGetStatus, DtmSetTransactionStatus, DtmGetSnapshot, PgGetNewTransactionId, DtmGetOldestXmin, PgTransactionIdIsInProgress, PgGetGlobalTransactionId, DtmXidInMVCCSnapshot };
8889

8990
void _PG_init(void);
9091
void _PG_fini(void);
@@ -456,10 +457,11 @@ bool DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
456457
#endif
457458
while (true)
458459
{
459-
DtmTransStatus* ts;
460-
TransactionId subxid = xid;
461-
while ((ts = (DtmTransStatus*)hash_search(xid2status, &subxid, HASH_FIND, NULL)) == NULL
462-
&& TransactionIdIsValid(subxid = SubTransGetParent(subxid)));
460+
DtmTransStatus* ts = (DtmTransStatus*)hash_search(xid2status, &xid, HASH_FIND, NULL);
461+
if (ts == NULL && TransactionIdFollowsOrEquals(xid, TransactionXmin)) {
462+
xid = SubTransGetTopmostTransaction(xid);
463+
ts = (DtmTransStatus*)hash_search(xid2status, &xid, HASH_FIND, NULL);
464+
}
463465
if (ts != NULL)
464466
{
465467
if (ts->cid > dtm_tx.snapshot) {
@@ -679,8 +681,8 @@ void DtmLocalCommit(DtmTransState* x)
679681
if (x->is_prepared) {
680682
Assert(found);
681683
Assert(x->is_global);
682-
} else {
683-
Assert(!found);
684+
} else if (!found) {
685+
//Assert(!found);
684686
ts->cid = dtm_get_cid();
685687
IncludeInTransactionList(ts);
686688
}
@@ -718,8 +720,8 @@ void DtmLocalAbort(DtmTransState* x)
718720
if (x->is_prepared) {
719721
Assert(found);
720722
Assert(x->is_global);
721-
} else {
722-
Assert(!found);
723+
} else if (!found) {
724+
//Assert(!found);
723725
ts->cid = dtm_get_cid();
724726
IncludeInTransactionList(ts);
725727
}
@@ -738,3 +740,28 @@ void DtmLocalEnd(DtmTransState* x)
738740
x->cid = INVALID_CID;
739741
}
740742

743+
void DtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn)
744+
{
745+
if (nsubxids != 0) {
746+
SpinLockAcquire(&local->lock);
747+
{
748+
int i;
749+
bool found;
750+
DtmTransStatus* ts = (DtmTransStatus*)hash_search(xid2status, &xid, HASH_ENTER, &found);
751+
if (!found) {
752+
ts->cid = dtm_get_cid();
753+
}
754+
ts->status = status;
755+
for (i = 0; i < nsubxids; i++) {
756+
DtmTransStatus* sts = (DtmTransStatus*)hash_search(xid2status, &subxids[i], HASH_ENTER, &found);
757+
Assert(!found);
758+
sts->status = status;
759+
sts->cid = ts->cid;
760+
sts->next = ts->next;
761+
ts->next = sts;
762+
}
763+
}
764+
SpinLockRelease(&local->lock);
765+
}
766+
PgTransactionIdSetTreeStatus(xid, nsubxids, subxids, status, lsn);
767+
}

0 commit comments

Comments
 (0)