@@ -87,7 +87,10 @@ static bool DtmGlobalXidAssigned;
87
87
static int DtmLocalXidReserve ;
88
88
static int DtmCurcid ;
89
89
static Snapshot DtmLastSnapshot ;
90
- static TransactionManager DtmTM = { DtmGetTransactionStatus , DtmSetTransactionStatus , DtmGetSnapshot , DtmGetNewTransactionId , DtmGetOldestXmin , TransactionIdIsRunning , DtmGetGlobalTransactionId };
90
+ static TransactionManager DtmTM = { DtmGetTransactionStatus , DtmSetTransactionStatus , DtmGetSnapshot , DtmGetNewTransactionId , DtmGetOldestXmin , PgTransactionIdIsInProgress , DtmGetGlobalTransactionId , PgXidInMVCCSnapshot };
91
+
92
+ static char * DtmHost ;
93
+ static int DtmPort ;
91
94
92
95
93
96
#define XTM_TRACE (fmt , ...)
@@ -169,7 +172,7 @@ static void DtmMergeWithGlobalSnapshot(Snapshot dst)
169
172
* Check that global and local snapshots are consistent: transactions marked as completed in global snapohsot
170
173
* should be completed locally
171
174
*/
172
- dst = GetLocalSnapshotData (dst );
175
+ dst = PgGetSnapshotData (dst );
173
176
for (i = 0 ; i < dst -> xcnt ; i ++ ) {
174
177
if (TransactionIdIsInDoubt (dst -> xip [i ])) {
175
178
goto GetLocalSnapshot ;
@@ -213,7 +216,7 @@ static void DtmMergeWithGlobalSnapshot(Snapshot dst)
213
216
*/
214
217
static TransactionId DtmGetOldestXmin (Relation rel , bool ignoreVacuum )
215
218
{
216
- TransactionId localXmin = GetOldestLocalXmin (rel , ignoreVacuum );
219
+ TransactionId localXmin = PgGetOldestXmin (rel , ignoreVacuum );
217
220
TransactionId globalXmin = dtm -> minXid ;
218
221
XTM_INFO ("XTM: DtmGetOldestXmin localXmin=%d, globalXmin=%d\n" , localXmin , globalXmin );
219
222
@@ -526,7 +529,7 @@ static Snapshot DtmGetSnapshot(Snapshot snapshot)
526
529
* which PRECEDS actual transaction for which Xid is received.
527
530
* This transaction doesn't need to take in accountn global snapshot
528
531
*/
529
- return GetLocalSnapshotData (snapshot );
532
+ return PgGetSnapshotData (snapshot );
530
533
}
531
534
if (TransactionIdIsValid (DtmNextXid ) && snapshot != & CatalogSnapshotData ) {
532
535
if (!DtmHasGlobalSnapshot && (snapshot != DtmLastSnapshot || DtmCurcid != snapshot -> curcid )) {
@@ -543,7 +546,7 @@ static Snapshot DtmGetSnapshot(Snapshot snapshot)
543
546
}
544
547
} else {
545
548
/* For local transactions and catalog snapshots use default GetSnapshotData implementation */
546
- snapshot = GetLocalSnapshotData (snapshot );
549
+ snapshot = PgGetSnapshotData (snapshot );
547
550
}
548
551
DtmUpdateRecentXmin (snapshot );
549
552
CurrentTransactionSnapshot = snapshot ;
@@ -557,7 +560,7 @@ static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn)
557
560
*/
558
561
XidStatus status = xid >= ShmemVariableCache -> nextXid
559
562
? TRANSACTION_STATUS_IN_PROGRESS
560
- : CLOGTransactionIdGetStatus (xid , lsn );
563
+ : PgTransactionIdGetStatus (xid , lsn );
561
564
XTM_TRACE ("XTM: DtmGetTransactionStatus\n" );
562
565
return status ;
563
566
}
@@ -569,7 +572,7 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
569
572
if (!DtmGlobalXidAssigned && TransactionIdIsValid (DtmNextXid )) {
570
573
CurrentTransactionSnapshot = NULL ;
571
574
if (status == TRANSACTION_STATUS_ABORTED ) {
572
- CLOGTransactionIdSetTreeStatus (xid , nsubxids , subxids , status , lsn );
575
+ PgTransactionIdSetTreeStatus (xid , nsubxids , subxids , status , lsn );
573
576
DtmGlobalSetTransStatus (xid , status , false);
574
577
XTM_INFO ("Abort transaction %d\n" , xid );
575
578
return ;
@@ -592,7 +595,7 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
592
595
status = gs ;
593
596
}
594
597
}
595
- CLOGTransactionIdSetTreeStatus (xid , nsubxids , subxids , status , lsn );
598
+ PgTransactionIdSetTreeStatus (xid , nsubxids , subxids , status , lsn );
596
599
}
597
600
598
601
static uint32 dtm_xid_hash_fn (const void * key , Size keysize )
@@ -713,6 +716,36 @@ _PG_init(void)
713
716
NULL
714
717
);
715
718
719
+ DefineCustomStringVariable (
720
+ "dtm.host" ,
721
+ "The host where DTM daemon resides" ,
722
+ NULL ,
723
+ & DtmHost ,
724
+ "127.0.0.1" ,
725
+ PGC_BACKEND , // context
726
+ 0 , // flags,
727
+ NULL , // GucStringCheckHook check_hook,
728
+ NULL , // GucStringAssignHook assign_hook,
729
+ NULL // GucShowHook show_hook
730
+ );
731
+
732
+ DefineCustomIntVariable (
733
+ "dtm.port" ,
734
+ "The port DTM daemon is listening" ,
735
+ NULL ,
736
+ & DtmPort ,
737
+ 5431 ,
738
+ 1 ,
739
+ INT_MAX ,
740
+ PGC_BACKEND ,
741
+ 0 ,
742
+ NULL ,
743
+ NULL ,
744
+ NULL
745
+ );
746
+
747
+ TuneToDtm (DtmHost , DtmPort );
748
+
716
749
/*
717
750
* Install hooks.
718
751
*/
@@ -748,6 +781,7 @@ PG_FUNCTION_INFO_V1(dtm_begin_transaction);
748
781
PG_FUNCTION_INFO_V1 (dtm_join_transaction );
749
782
PG_FUNCTION_INFO_V1 (dtm_get_current_snapshot_xmax );
750
783
PG_FUNCTION_INFO_V1 (dtm_get_current_snapshot_xmin );
784
+ PG_FUNCTION_INFO_V1 (dtm_get_current_snapshot_xcnt );
751
785
752
786
Datum
753
787
dtm_get_current_snapshot_xmin (PG_FUNCTION_ARGS )
@@ -761,11 +795,19 @@ dtm_get_current_snapshot_xmax(PG_FUNCTION_ARGS)
761
795
PG_RETURN_INT32 (CurrentTransactionSnapshot -> xmax );
762
796
}
763
797
798
+ Datum
799
+ dtm_get_current_snapshot_xcnt (PG_FUNCTION_ARGS )
800
+ {
801
+ PG_RETURN_INT32 (CurrentTransactionSnapshot -> xcnt );
802
+ }
803
+
764
804
Datum
765
805
dtm_begin_transaction (PG_FUNCTION_ARGS )
766
806
{
767
807
Assert (!TransactionIdIsValid (DtmNextXid ));
768
-
808
+ if (dtm == NULL ) {
809
+ elog (ERROR , "DTM is not properly initialized, please check that pg_dtm plugin was added to shared_preload_libraries list in postgresql.conf" );
810
+ }
769
811
DtmNextXid = DtmGlobalStartTransaction (& DtmSnapshot , & dtm -> minXid );
770
812
Assert (TransactionIdIsValid (DtmNextXid ));
771
813
XTM_INFO ("%d: Start global transaction %d, dtm->minXid=%d\n" , getpid (), DtmNextXid , dtm -> minXid );
0 commit comments