Skip to content

Commit c7ed87e

Browse files
committed
Rewrite support if subtransactions
1 parent 610f7c5 commit c7ed87e

File tree

2 files changed

+100
-66
lines changed

2 files changed

+100
-66
lines changed

pg_dtm.c

Lines changed: 89 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ typedef struct DtmTransStatus
4848
{
4949
TransactionId xid;
5050
XidStatus status;
51+
int nSubxids;
5152
cid_t cid;
5253
struct DtmTransStatus* next;
5354
} DtmTransStatus;
@@ -65,6 +66,8 @@ typedef struct
6566
{
6667
char gtid[MAX_GTID_SIZE];
6768
TransactionId xid;
69+
TransactionId* subxids;
70+
int nSubxids;
6871
} DtmTransId;
6972

7073

@@ -75,19 +78,19 @@ static shmem_startup_hook_type prev_shmem_startup_hook;
7578
static HTAB* xid2status;
7679
static HTAB* gtid2xid;
7780
static DtmNodeState* local;
78-
static DtmTransState dtm_tx;
81+
static DtmCurrentTrans dtm_tx;
7982
static uint64 totalSleepInterrupts;
8083
static int DtmVacuumDelay;
8184

8285
static Snapshot DtmGetSnapshot(Snapshot snapshot);
8386
static TransactionId DtmGetOldestXmin(Relation rel, bool ignoreVacuum);
8487
static bool DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot);
8588
static TransactionId DtmAdjustOldestXid(TransactionId xid);
86-
static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn);
8789
static bool DtmDetectGlobalDeadLock(PGPROC* proc);
8890
static cid_t DtmGetCsn(TransactionId xid);
91+
static void DtmAddSubtransactions(DtmTransStatus* ts, TransactionId* subxids, int nSubxids);
8992

90-
static TransactionManager DtmTM = { PgTransactionIdGetStatus, DtmSetTransactionStatus, DtmGetSnapshot, PgGetNewTransactionId, DtmGetOldestXmin, PgTransactionIdIsInProgress, PgGetGlobalTransactionId, DtmXidInMVCCSnapshot, DtmDetectGlobalDeadLock };
93+
static TransactionManager DtmTM = { PgTransactionIdGetStatus, PgTransactionIdSetTreeStatus, DtmGetSnapshot, PgGetNewTransactionId, DtmGetOldestXmin, PgTransactionIdIsInProgress, PgGetGlobalTransactionId, DtmXidInMVCCSnapshot, DtmDetectGlobalDeadLock };
9194

9295
void _PG_init(void);
9396
void _PG_fini(void);
@@ -290,6 +293,7 @@ dtm_xact_callback(XactEvent event, void *arg)
290293
break;
291294

292295
case XACT_EVENT_PREPARE:
296+
DtmLocalSavePreparedState(dtm_get_global_trans_id());
293297
DtmLocalEnd(&dtm_tx);
294298
break;
295299

@@ -462,6 +466,7 @@ bool DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
462466
static timestamp_t firstReportTime;
463467
static timestamp_t prevReportTime;
464468
static timestamp_t totalSleepTime;
469+
static timestamp_t maxSleepTime;
465470
#endif
466471
timestamp_t delay = MIN_WAIT_TIMEOUT;
467472
Assert(xid != InvalidTransactionId);
@@ -476,13 +481,6 @@ bool DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
476481
while (true)
477482
{
478483
DtmTransStatus* ts = (DtmTransStatus*)hash_search(xid2status, &xid, HASH_FIND, NULL);
479-
if (ts == NULL &&
480-
!(TransactionIdFollowsOrEquals(xid, snapshot->xmax) || TransactionIdPrecedes(xid, snapshot->xmin)))
481-
{
482-
//TransactionIdFollowsOrEquals(xid, TransactionXmin)) {
483-
TransactionId subxid = SubTransGetTopmostTransaction(xid);
484-
ts = (DtmTransStatus*)hash_search(xid2status, &subxid, HASH_FIND, NULL);
485-
}
486484
if (ts != NULL)
487485
{
488486
if (ts->cid > dtm_tx.snapshot) {
@@ -497,17 +495,21 @@ bool DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
497495
SpinLockRelease(&local->lock);
498496
#if TRACE_SLEEP_TIME
499497
{
500-
timestamp_t now = dtm_get_current_time();
498+
timestamp_t delta, now = dtm_get_current_time();
501499
#endif
502500
dtm_sleep(delay);
503501
#if TRACE_SLEEP_TIME
504-
totalSleepTime += dtm_get_current_time() - now;
505-
if (now > prevReportTime + USEC*1) {
502+
delta = dtm_get_current_time() - now;
503+
totalSleepTime += delta;
504+
if (delta > maxSleepTime) {
505+
maxSleepTime = delta;
506+
}
507+
if (now > prevReportTime + USEC*10) {
506508
prevReportTime = now;
507509
if (firstReportTime == 0) {
508510
firstReportTime = now;
509511
} else {
510-
fprintf(stderr, "Snapshot sleep %lu of %lu usec (%f%%)\n", totalSleepTime, now - firstReportTime, totalSleepTime*100.0/(now - firstReportTime));
512+
fprintf(stderr, "Snapshot sleep %lu of %lu usec (%f%%), maximum=%lu\n", totalSleepTime, now - firstReportTime, totalSleepTime*100.0/(now - firstReportTime), maxSleepTime);
511513
}
512514
}
513515
}
@@ -577,7 +579,7 @@ void DtmInitialize()
577579
}
578580

579581

580-
void DtmLocalBegin(DtmTransState* x)
582+
void DtmLocalBegin(DtmCurrentTrans* x)
581583
{
582584
if (x->xid == InvalidTransactionId) {
583585
SpinLockAcquire(&local->lock);
@@ -592,30 +594,32 @@ void DtmLocalBegin(DtmTransState* x)
592594
}
593595
}
594596

595-
cid_t DtmLocalExtend(DtmTransState* x, GlobalTransactionId gtid)
597+
cid_t DtmLocalExtend(DtmCurrentTrans* x, GlobalTransactionId gtid)
596598
{
597599
if (gtid != NULL) {
598600
SpinLockAcquire(&local->lock);
599601
{
600602
DtmTransId* id = (DtmTransId*)hash_search(gtid2xid, gtid, HASH_ENTER, NULL);
601-
x->is_global = true;
602603
id->xid = x->xid;
604+
id->nSubxids = 0;
605+
id->subxids = 0;
603606
}
604607
SpinLockRelease(&local->lock);
605-
} else {
606-
x->is_global = true;
607-
}
608+
}
609+
x->is_global = true;
608610
return x->snapshot;
609611
}
610612

611-
cid_t DtmLocalAccess(DtmTransState* x, GlobalTransactionId gtid, cid_t global_cid)
613+
cid_t DtmLocalAccess(DtmCurrentTrans* x, GlobalTransactionId gtid, cid_t global_cid)
612614
{
613615
cid_t local_cid;
614616
SpinLockAcquire(&local->lock);
615617
{
616618
if (gtid != NULL) {
617619
DtmTransId* id = (DtmTransId*)hash_search(gtid2xid, gtid, HASH_ENTER, NULL);
618620
id->xid = x->xid;
621+
id->nSubxids = 0;
622+
id->subxids = 0;
619623
}
620624
local_cid = dtm_sync(global_cid);
621625
x->snapshot = local_cid;
@@ -638,7 +642,9 @@ void DtmLocalBeginPrepare(GlobalTransactionId gtid)
638642
ts = (DtmTransStatus*)hash_search(xid2status, &id->xid, HASH_ENTER, NULL);
639643
ts->status = TRANSACTION_STATUS_IN_PROGRESS;
640644
ts->cid = dtm_get_cid();
645+
ts->nSubxids = id->nSubxids;
641646
DtmTransactionListAppend(ts);
647+
DtmAddSubtransactions(ts, id->subxids, id->nSubxids);
642648
}
643649
SpinLockRelease(&local->lock);
644650
}
@@ -661,22 +667,26 @@ void DtmLocalEndPrepare(GlobalTransactionId gtid, cid_t cid)
661667
{
662668
DtmTransStatus* ts;
663669
DtmTransId* id;
670+
int i;
664671

665672
id = (DtmTransId*)hash_search(gtid2xid, gtid, HASH_FIND, NULL);
666673
Assert(id != NULL);
667674

668675
ts = (DtmTransStatus*)hash_search(xid2status, &id->xid, HASH_FIND, NULL);
669676
Assert(ts != NULL);
670677
ts->cid = cid;
671-
678+
for (i = 0; i < ts->nSubxids; i++) {
679+
ts = ts->next;
680+
ts->cid = cid;
681+
}
672682
dtm_sync(cid);
673683

674684
DTM_TRACE((stderr, "Prepare transaction %u(%s) with CSN %lu\n", id->xid, gtid, cid));
675685
}
676686
SpinLockRelease(&local->lock);
677687
}
678688

679-
void DtmLocalCommitPrepared(DtmTransState* x, GlobalTransactionId gtid)
689+
void DtmLocalCommitPrepared(DtmCurrentTrans* x, GlobalTransactionId gtid)
680690
{
681691
Assert(gtid != NULL);
682692

@@ -688,33 +698,45 @@ void DtmLocalCommitPrepared(DtmTransState* x, GlobalTransactionId gtid)
688698
x->is_global = true;
689699
x->is_prepared = true;
690700
x->xid = id->xid;
701+
free(id->subxids);
702+
691703
DTM_TRACE((stderr, "Global transaction %u(%s) is precommitted\n", x->xid, gtid));
692704
}
693705
SpinLockRelease(&local->lock);
694706
}
695707

696-
void DtmLocalCommit(DtmTransState* x)
708+
void DtmLocalCommit(DtmCurrentTrans* x)
697709
{
698710
SpinLockAcquire(&local->lock);
699711
{
700712
bool found;
701713
DtmTransStatus* ts = (DtmTransStatus*)hash_search(xid2status, &x->xid, HASH_ENTER, &found);
714+
ts->status = TRANSACTION_STATUS_COMMITTED;
702715
if (x->is_prepared) {
716+
int i;
717+
DtmTransStatus* sts = ts;
703718
Assert(found);
704719
Assert(x->is_global);
705-
} else if (!found) {
706-
//Assert(!found);
720+
for (i = 0; i < ts->nSubxids; i++) {
721+
sts = sts->next;
722+
Assert(sts->cid == ts->cid);
723+
sts->status = TRANSACTION_STATUS_COMMITTED;
724+
}
725+
} else {
726+
TransactionId* subxids;
727+
Assert(!found);
707728
ts->cid = dtm_get_cid();
708729
DtmTransactionListAppend(ts);
730+
ts->nSubxids = xactGetCommittedChildren(&subxids);
731+
DtmAddSubtransactions(ts, subxids, ts->nSubxids);
709732
}
710733
x->cid = ts->cid;
711-
ts->status = TRANSACTION_STATUS_COMMITTED;
712734
DTM_TRACE((stderr, "Local transaction %u is committed at %lu\n", x->xid, x->cid));
713735
}
714736
SpinLockRelease(&local->lock);
715737
}
716738

717-
void DtmLocalAbortPrepared(DtmTransState* x, GlobalTransactionId gtid)
739+
void DtmLocalAbortPrepared(DtmCurrentTrans* x, GlobalTransactionId gtid)
718740
{
719741
Assert (gtid != NULL);
720742

@@ -726,13 +748,14 @@ void DtmLocalAbortPrepared(DtmTransState* x, GlobalTransactionId gtid)
726748
x->is_global = true;
727749
x->is_prepared = true;
728750
x->xid = id->xid;
751+
free(id->subxids);
729752

730753
DTM_TRACE((stderr, "Global transaction %u(%s) is preaborted\n", x->xid, gtid));
731754
}
732755
SpinLockRelease(&local->lock);
733756
}
734757

735-
void DtmLocalAbort(DtmTransState* x)
758+
void DtmLocalAbort(DtmCurrentTrans* x)
736759
{
737760
SpinLockAcquire(&local->lock);
738761
{
@@ -741,9 +764,10 @@ void DtmLocalAbort(DtmTransState* x)
741764
if (x->is_prepared) {
742765
Assert(found);
743766
Assert(x->is_global);
744-
} else if (!found) {
745-
//Assert(!found);
767+
} else {
768+
Assert(!found);
746769
ts->cid = dtm_get_cid();
770+
ts->nSubxids = 0;
747771
DtmTransactionListAppend(ts);
748772
}
749773
x->cid = ts->cid;
@@ -753,39 +777,14 @@ void DtmLocalAbort(DtmTransState* x)
753777
SpinLockRelease(&local->lock);
754778
}
755779

756-
void DtmLocalEnd(DtmTransState* x)
780+
void DtmLocalEnd(DtmCurrentTrans* x)
757781
{
758782
x->is_global = false;
759783
x->is_prepared = false;
760784
x->xid = InvalidTransactionId;
761785
x->cid = INVALID_CID;
762786
}
763787

764-
void DtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn)
765-
{
766-
if (nsubxids != 0) {
767-
SpinLockAcquire(&local->lock);
768-
{
769-
int i;
770-
bool found;
771-
DtmTransStatus* ts = (DtmTransStatus*)hash_search(xid2status, &xid, HASH_ENTER, &found);
772-
if (!found) {
773-
ts->cid = dtm_get_cid();
774-
}
775-
ts->status = status;
776-
for (i = 0; i < nsubxids; i++) {
777-
DtmTransStatus* sts = (DtmTransStatus*)hash_search(xid2status, &subxids[i], HASH_ENTER, &found);
778-
Assert(!found);
779-
sts->status = status;
780-
sts->cid = ts->cid;
781-
DtmTransactionListInsertAfter(ts, sts);
782-
}
783-
}
784-
SpinLockRelease(&local->lock);
785-
}
786-
PgTransactionIdSetTreeStatus(xid, nsubxids, subxids, status, lsn);
787-
}
788-
789788
bool DtmDetectGlobalDeadLock(PGPROC* proc)
790789
{
791790
elog(WARNING, "Global deadlock?");
@@ -806,3 +805,36 @@ static cid_t DtmGetCsn(TransactionId xid)
806805
return csn;
807806
}
808807

808+
void DtmLocalSavePreparedState(GlobalTransactionId gtid)
809+
{
810+
if (gtid != NULL) {
811+
SpinLockAcquire(&local->lock);
812+
{
813+
DtmTransId* id = (DtmTransId*)hash_search(gtid2xid, gtid, HASH_FIND, NULL);
814+
if (id != NULL) {
815+
TransactionId* subxids;
816+
int nSubxids = xactGetCommittedChildren(&subxids);
817+
if (nSubxids != 0) {
818+
id->subxids = (TransactionId*)malloc(nSubxids*sizeof(TransactionId));
819+
id->nSubxids = nSubxids;
820+
memcpy(id->subxids, subxids, nSubxids*sizeof(TransactionId));
821+
}
822+
}
823+
}
824+
SpinLockRelease(&local->lock);
825+
}
826+
}
827+
828+
static void DtmAddSubtransactions(DtmTransStatus* ts, TransactionId* subxids, int nSubxids)
829+
{
830+
int i;
831+
for (i = 0; i < nSubxids; i++) {
832+
bool found;
833+
DtmTransStatus* sts = (DtmTransStatus*)hash_search(xid2status, &subxids[i], HASH_ENTER, &found);
834+
Assert(!found);
835+
sts->status = ts->status;
836+
sts->cid = ts->cid;
837+
sts->nSubxids = 0;
838+
DtmTransactionListInsertAfter(ts, sts);
839+
}
840+
}

pg_dtm.h

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,33 +10,35 @@ typedef struct {
1010
bool is_prepared;
1111
cid_t cid;
1212
cid_t snapshot;
13-
} DtmTransState;
13+
} DtmCurrentTrans;
1414

1515
typedef char const* GlobalTransactionId;
1616

1717
/* Initialize DTM extension */
1818
void DtmInitialize(void);
1919
/* Invoked at start of any local or global transaction */
20-
void DtmLocalBegin(DtmTransState* x);
20+
void DtmLocalBegin(DtmCurrentTrans* x);
2121
/* Extend local transaction to global by assigning upper bound CSN which is returned to coordinator */
22-
cid_t DtmLocalExtend(DtmTransState* x, GlobalTransactionId gtid);
22+
cid_t DtmLocalExtend(DtmCurrentTrans* x, GlobalTransactionId gtid);
2323
/* Function called at first access to any datanode except first one involved in distributed transaction */
24-
cid_t DtmLocalAccess(DtmTransState* x, GlobalTransactionId gtid, cid_t snapshot);
24+
cid_t DtmLocalAccess(DtmCurrentTrans* x, GlobalTransactionId gtid, cid_t snapshot);
2525
/* Mark transaction as in-doubt */
2626
void DtmLocalBeginPrepare(GlobalTransactionId gtid);
2727
/* Choose CSN for global transaction */
2828
cid_t DtmLocalPrepare(GlobalTransactionId gtid, cid_t cid);
2929
/* Assign CSN to global transaction */
3030
void DtmLocalEndPrepare(GlobalTransactionId gtid, cid_t cid);
3131
/* Do local commit of global transaction */
32-
void DtmLocalCommitPrepared(DtmTransState* x, GlobalTransactionId gtid);
32+
void DtmLocalCommitPrepared(DtmCurrentTrans* x, GlobalTransactionId gtid);
3333
/* Do local abort of global transaction */
34-
void DtmLocalAbortPrepared(DtmTransState* x, GlobalTransactionId gtid);
34+
void DtmLocalAbortPrepared(DtmCurrentTrans* x, GlobalTransactionId gtid);
3535
/* Do local commit of global transaction */
36-
void DtmLocalCommit(DtmTransState* x);
36+
void DtmLocalCommit(DtmCurrentTrans* x);
3737
/* Do local abort of global transaction */
38-
void DtmLocalAbort(DtmTransState* x);
38+
void DtmLocalAbort(DtmCurrentTrans* x);
3939
/* Invoked at the end of any local or global transaction: free transaction state */
40-
void DtmLocalEnd(DtmTransState* x);
40+
void DtmLocalEnd(DtmCurrentTrans* x);
41+
/* Save global preapred transactoin state */
42+
void DtmLocalSavePreparedState(GlobalTransactionId gtid);
4143

4244
#endif

0 commit comments

Comments
 (0)