@@ -48,6 +48,7 @@ typedef struct DtmTransStatus
48
48
{
49
49
TransactionId xid ;
50
50
XidStatus status ;
51
+ int nSubxids ;
51
52
cid_t cid ;
52
53
struct DtmTransStatus * next ;
53
54
} DtmTransStatus ;
@@ -65,6 +66,8 @@ typedef struct
65
66
{
66
67
char gtid [MAX_GTID_SIZE ];
67
68
TransactionId xid ;
69
+ TransactionId * subxids ;
70
+ int nSubxids ;
68
71
} DtmTransId ;
69
72
70
73
@@ -75,19 +78,19 @@ static shmem_startup_hook_type prev_shmem_startup_hook;
75
78
static HTAB * xid2status ;
76
79
static HTAB * gtid2xid ;
77
80
static DtmNodeState * local ;
78
- static DtmTransState dtm_tx ;
81
+ static DtmCurrentTrans dtm_tx ;
79
82
static uint64 totalSleepInterrupts ;
80
83
static int DtmVacuumDelay ;
81
84
82
85
static Snapshot DtmGetSnapshot (Snapshot snapshot );
83
86
static TransactionId DtmGetOldestXmin (Relation rel , bool ignoreVacuum );
84
87
static bool DtmXidInMVCCSnapshot (TransactionId xid , Snapshot snapshot );
85
88
static TransactionId DtmAdjustOldestXid (TransactionId xid );
86
- static void DtmSetTransactionStatus (TransactionId xid , int nsubxids , TransactionId * subxids , XidStatus status , XLogRecPtr lsn );
87
89
static bool DtmDetectGlobalDeadLock (PGPROC * proc );
88
90
static cid_t DtmGetCsn (TransactionId xid );
91
+ static void DtmAddSubtransactions (DtmTransStatus * ts , TransactionId * subxids , int nSubxids );
89
92
90
- static TransactionManager DtmTM = { PgTransactionIdGetStatus , DtmSetTransactionStatus , DtmGetSnapshot , PgGetNewTransactionId , DtmGetOldestXmin , PgTransactionIdIsInProgress , PgGetGlobalTransactionId , DtmXidInMVCCSnapshot , DtmDetectGlobalDeadLock };
93
+ static TransactionManager DtmTM = { PgTransactionIdGetStatus , PgTransactionIdSetTreeStatus , DtmGetSnapshot , PgGetNewTransactionId , DtmGetOldestXmin , PgTransactionIdIsInProgress , PgGetGlobalTransactionId , DtmXidInMVCCSnapshot , DtmDetectGlobalDeadLock };
91
94
92
95
void _PG_init (void );
93
96
void _PG_fini (void );
@@ -290,6 +293,7 @@ dtm_xact_callback(XactEvent event, void *arg)
290
293
break ;
291
294
292
295
case XACT_EVENT_PREPARE :
296
+ DtmLocalSavePreparedState (dtm_get_global_trans_id ());
293
297
DtmLocalEnd (& dtm_tx );
294
298
break ;
295
299
@@ -462,6 +466,7 @@ bool DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
462
466
static timestamp_t firstReportTime ;
463
467
static timestamp_t prevReportTime ;
464
468
static timestamp_t totalSleepTime ;
469
+ static timestamp_t maxSleepTime ;
465
470
#endif
466
471
timestamp_t delay = MIN_WAIT_TIMEOUT ;
467
472
Assert (xid != InvalidTransactionId );
@@ -476,13 +481,6 @@ bool DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
476
481
while (true)
477
482
{
478
483
DtmTransStatus * ts = (DtmTransStatus * )hash_search (xid2status , & xid , HASH_FIND , NULL );
479
- if (ts == NULL &&
480
- !(TransactionIdFollowsOrEquals (xid , snapshot -> xmax ) || TransactionIdPrecedes (xid , snapshot -> xmin )))
481
- {
482
- //TransactionIdFollowsOrEquals(xid, TransactionXmin)) {
483
- TransactionId subxid = SubTransGetTopmostTransaction (xid );
484
- ts = (DtmTransStatus * )hash_search (xid2status , & subxid , HASH_FIND , NULL );
485
- }
486
484
if (ts != NULL )
487
485
{
488
486
if (ts -> cid > dtm_tx .snapshot ) {
@@ -497,17 +495,21 @@ bool DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
497
495
SpinLockRelease (& local -> lock );
498
496
#if TRACE_SLEEP_TIME
499
497
{
500
- timestamp_t now = dtm_get_current_time ();
498
+ timestamp_t delta , now = dtm_get_current_time ();
501
499
#endif
502
500
dtm_sleep (delay );
503
501
#if TRACE_SLEEP_TIME
504
- totalSleepTime += dtm_get_current_time () - now ;
505
- if (now > prevReportTime + USEC * 1 ) {
502
+ delta = dtm_get_current_time () - now ;
503
+ totalSleepTime += delta ;
504
+ if (delta > maxSleepTime ) {
505
+ maxSleepTime = delta ;
506
+ }
507
+ if (now > prevReportTime + USEC * 10 ) {
506
508
prevReportTime = now ;
507
509
if (firstReportTime == 0 ) {
508
510
firstReportTime = now ;
509
511
} else {
510
- fprintf (stderr , "Snapshot sleep %lu of %lu usec (%f%%)\n" , totalSleepTime , now - firstReportTime , totalSleepTime * 100.0 /(now - firstReportTime ));
512
+ fprintf (stderr , "Snapshot sleep %lu of %lu usec (%f%%), maximum=%lu \n" , totalSleepTime , now - firstReportTime , totalSleepTime * 100.0 /(now - firstReportTime ), maxSleepTime );
511
513
}
512
514
}
513
515
}
@@ -577,7 +579,7 @@ void DtmInitialize()
577
579
}
578
580
579
581
580
- void DtmLocalBegin (DtmTransState * x )
582
+ void DtmLocalBegin (DtmCurrentTrans * x )
581
583
{
582
584
if (x -> xid == InvalidTransactionId ) {
583
585
SpinLockAcquire (& local -> lock );
@@ -592,30 +594,32 @@ void DtmLocalBegin(DtmTransState* x)
592
594
}
593
595
}
594
596
595
- cid_t DtmLocalExtend (DtmTransState * x , GlobalTransactionId gtid )
597
+ cid_t DtmLocalExtend (DtmCurrentTrans * x , GlobalTransactionId gtid )
596
598
{
597
599
if (gtid != NULL ) {
598
600
SpinLockAcquire (& local -> lock );
599
601
{
600
602
DtmTransId * id = (DtmTransId * )hash_search (gtid2xid , gtid , HASH_ENTER , NULL );
601
- x -> is_global = true;
602
603
id -> xid = x -> xid ;
604
+ id -> nSubxids = 0 ;
605
+ id -> subxids = 0 ;
603
606
}
604
607
SpinLockRelease (& local -> lock );
605
- } else {
606
- x -> is_global = true;
607
- }
608
+ }
609
+ x -> is_global = true;
608
610
return x -> snapshot ;
609
611
}
610
612
611
- cid_t DtmLocalAccess (DtmTransState * x , GlobalTransactionId gtid , cid_t global_cid )
613
+ cid_t DtmLocalAccess (DtmCurrentTrans * x , GlobalTransactionId gtid , cid_t global_cid )
612
614
{
613
615
cid_t local_cid ;
614
616
SpinLockAcquire (& local -> lock );
615
617
{
616
618
if (gtid != NULL ) {
617
619
DtmTransId * id = (DtmTransId * )hash_search (gtid2xid , gtid , HASH_ENTER , NULL );
618
620
id -> xid = x -> xid ;
621
+ id -> nSubxids = 0 ;
622
+ id -> subxids = 0 ;
619
623
}
620
624
local_cid = dtm_sync (global_cid );
621
625
x -> snapshot = local_cid ;
@@ -638,7 +642,9 @@ void DtmLocalBeginPrepare(GlobalTransactionId gtid)
638
642
ts = (DtmTransStatus * )hash_search (xid2status , & id -> xid , HASH_ENTER , NULL );
639
643
ts -> status = TRANSACTION_STATUS_IN_PROGRESS ;
640
644
ts -> cid = dtm_get_cid ();
645
+ ts -> nSubxids = id -> nSubxids ;
641
646
DtmTransactionListAppend (ts );
647
+ DtmAddSubtransactions (ts , id -> subxids , id -> nSubxids );
642
648
}
643
649
SpinLockRelease (& local -> lock );
644
650
}
@@ -661,22 +667,26 @@ void DtmLocalEndPrepare(GlobalTransactionId gtid, cid_t cid)
661
667
{
662
668
DtmTransStatus * ts ;
663
669
DtmTransId * id ;
670
+ int i ;
664
671
665
672
id = (DtmTransId * )hash_search (gtid2xid , gtid , HASH_FIND , NULL );
666
673
Assert (id != NULL );
667
674
668
675
ts = (DtmTransStatus * )hash_search (xid2status , & id -> xid , HASH_FIND , NULL );
669
676
Assert (ts != NULL );
670
677
ts -> cid = cid ;
671
-
678
+ for (i = 0 ; i < ts -> nSubxids ; i ++ ) {
679
+ ts = ts -> next ;
680
+ ts -> cid = cid ;
681
+ }
672
682
dtm_sync (cid );
673
683
674
684
DTM_TRACE ((stderr , "Prepare transaction %u(%s) with CSN %lu\n" , id -> xid , gtid , cid ));
675
685
}
676
686
SpinLockRelease (& local -> lock );
677
687
}
678
688
679
- void DtmLocalCommitPrepared (DtmTransState * x , GlobalTransactionId gtid )
689
+ void DtmLocalCommitPrepared (DtmCurrentTrans * x , GlobalTransactionId gtid )
680
690
{
681
691
Assert (gtid != NULL );
682
692
@@ -688,33 +698,45 @@ void DtmLocalCommitPrepared(DtmTransState* x, GlobalTransactionId gtid)
688
698
x -> is_global = true;
689
699
x -> is_prepared = true;
690
700
x -> xid = id -> xid ;
701
+ free (id -> subxids );
702
+
691
703
DTM_TRACE ((stderr , "Global transaction %u(%s) is precommitted\n" , x -> xid , gtid ));
692
704
}
693
705
SpinLockRelease (& local -> lock );
694
706
}
695
707
696
- void DtmLocalCommit (DtmTransState * x )
708
+ void DtmLocalCommit (DtmCurrentTrans * x )
697
709
{
698
710
SpinLockAcquire (& local -> lock );
699
711
{
700
712
bool found ;
701
713
DtmTransStatus * ts = (DtmTransStatus * )hash_search (xid2status , & x -> xid , HASH_ENTER , & found );
714
+ ts -> status = TRANSACTION_STATUS_COMMITTED ;
702
715
if (x -> is_prepared ) {
716
+ int i ;
717
+ DtmTransStatus * sts = ts ;
703
718
Assert (found );
704
719
Assert (x -> is_global );
705
- } else if (!found ) {
706
- //Assert(!found);
720
+ for (i = 0 ; i < ts -> nSubxids ; i ++ ) {
721
+ sts = sts -> next ;
722
+ Assert (sts -> cid == ts -> cid );
723
+ sts -> status = TRANSACTION_STATUS_COMMITTED ;
724
+ }
725
+ } else {
726
+ TransactionId * subxids ;
727
+ Assert (!found );
707
728
ts -> cid = dtm_get_cid ();
708
729
DtmTransactionListAppend (ts );
730
+ ts -> nSubxids = xactGetCommittedChildren (& subxids );
731
+ DtmAddSubtransactions (ts , subxids , ts -> nSubxids );
709
732
}
710
733
x -> cid = ts -> cid ;
711
- ts -> status = TRANSACTION_STATUS_COMMITTED ;
712
734
DTM_TRACE ((stderr , "Local transaction %u is committed at %lu\n" , x -> xid , x -> cid ));
713
735
}
714
736
SpinLockRelease (& local -> lock );
715
737
}
716
738
717
- void DtmLocalAbortPrepared (DtmTransState * x , GlobalTransactionId gtid )
739
+ void DtmLocalAbortPrepared (DtmCurrentTrans * x , GlobalTransactionId gtid )
718
740
{
719
741
Assert (gtid != NULL );
720
742
@@ -726,13 +748,14 @@ void DtmLocalAbortPrepared(DtmTransState* x, GlobalTransactionId gtid)
726
748
x -> is_global = true;
727
749
x -> is_prepared = true;
728
750
x -> xid = id -> xid ;
751
+ free (id -> subxids );
729
752
730
753
DTM_TRACE ((stderr , "Global transaction %u(%s) is preaborted\n" , x -> xid , gtid ));
731
754
}
732
755
SpinLockRelease (& local -> lock );
733
756
}
734
757
735
- void DtmLocalAbort (DtmTransState * x )
758
+ void DtmLocalAbort (DtmCurrentTrans * x )
736
759
{
737
760
SpinLockAcquire (& local -> lock );
738
761
{
@@ -741,9 +764,10 @@ void DtmLocalAbort(DtmTransState* x)
741
764
if (x -> is_prepared ) {
742
765
Assert (found );
743
766
Assert (x -> is_global );
744
- } else if (! found ) {
745
- // Assert(!found);
767
+ } else {
768
+ Assert (!found );
746
769
ts -> cid = dtm_get_cid ();
770
+ ts -> nSubxids = 0 ;
747
771
DtmTransactionListAppend (ts );
748
772
}
749
773
x -> cid = ts -> cid ;
@@ -753,39 +777,14 @@ void DtmLocalAbort(DtmTransState* x)
753
777
SpinLockRelease (& local -> lock );
754
778
}
755
779
756
- void DtmLocalEnd (DtmTransState * x )
780
+ void DtmLocalEnd (DtmCurrentTrans * x )
757
781
{
758
782
x -> is_global = false;
759
783
x -> is_prepared = false;
760
784
x -> xid = InvalidTransactionId ;
761
785
x -> cid = INVALID_CID ;
762
786
}
763
787
764
- void DtmSetTransactionStatus (TransactionId xid , int nsubxids , TransactionId * subxids , XidStatus status , XLogRecPtr lsn )
765
- {
766
- if (nsubxids != 0 ) {
767
- SpinLockAcquire (& local -> lock );
768
- {
769
- int i ;
770
- bool found ;
771
- DtmTransStatus * ts = (DtmTransStatus * )hash_search (xid2status , & xid , HASH_ENTER , & found );
772
- if (!found ) {
773
- ts -> cid = dtm_get_cid ();
774
- }
775
- ts -> status = status ;
776
- for (i = 0 ; i < nsubxids ; i ++ ) {
777
- DtmTransStatus * sts = (DtmTransStatus * )hash_search (xid2status , & subxids [i ], HASH_ENTER , & found );
778
- Assert (!found );
779
- sts -> status = status ;
780
- sts -> cid = ts -> cid ;
781
- DtmTransactionListInsertAfter (ts , sts );
782
- }
783
- }
784
- SpinLockRelease (& local -> lock );
785
- }
786
- PgTransactionIdSetTreeStatus (xid , nsubxids , subxids , status , lsn );
787
- }
788
-
789
788
bool DtmDetectGlobalDeadLock (PGPROC * proc )
790
789
{
791
790
elog (WARNING , "Global deadlock?" );
@@ -806,3 +805,36 @@ static cid_t DtmGetCsn(TransactionId xid)
806
805
return csn ;
807
806
}
808
807
808
+ void DtmLocalSavePreparedState (GlobalTransactionId gtid )
809
+ {
810
+ if (gtid != NULL ) {
811
+ SpinLockAcquire (& local -> lock );
812
+ {
813
+ DtmTransId * id = (DtmTransId * )hash_search (gtid2xid , gtid , HASH_FIND , NULL );
814
+ if (id != NULL ) {
815
+ TransactionId * subxids ;
816
+ int nSubxids = xactGetCommittedChildren (& subxids );
817
+ if (nSubxids != 0 ) {
818
+ id -> subxids = (TransactionId * )malloc (nSubxids * sizeof (TransactionId ));
819
+ id -> nSubxids = nSubxids ;
820
+ memcpy (id -> subxids , subxids , nSubxids * sizeof (TransactionId ));
821
+ }
822
+ }
823
+ }
824
+ SpinLockRelease (& local -> lock );
825
+ }
826
+ }
827
+
828
+ static void DtmAddSubtransactions (DtmTransStatus * ts , TransactionId * subxids , int nSubxids )
829
+ {
830
+ int i ;
831
+ for (i = 0 ; i < nSubxids ; i ++ ) {
832
+ bool found ;
833
+ DtmTransStatus * sts = (DtmTransStatus * )hash_search (xid2status , & subxids [i ], HASH_ENTER , & found );
834
+ Assert (!found );
835
+ sts -> status = ts -> status ;
836
+ sts -> cid = ts -> cid ;
837
+ sts -> nSubxids = 0 ;
838
+ DtmTransactionListInsertAfter (ts , sts );
839
+ }
840
+ }
0 commit comments