@@ -46,6 +46,7 @@ typedef struct
46
46
{
47
47
LWLockId hashLock ;
48
48
LWLockId xidLock ;
49
+ TransactionId minXid ;
49
50
TransactionId nextXid ;
50
51
size_t nReservedXids ;
51
52
SnapshotData activeSnapshot ;
@@ -69,6 +70,7 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
69
70
static void DtmUpdateRecentXmin (Snapshot snapshot );
70
71
static void DtmInitialize (void );
71
72
static void DtmXactCallback (XactEvent event , void * arg );
73
+ static bool DtmTransactionIdIsInProgress (TransactionId xid );
72
74
static TransactionId DtmGetNextXid (void );
73
75
static TransactionId DtmGetNewTransactionId (bool isSubXact );
74
76
static TransactionId DtmGetOldestXmin (Relation rel , bool ignoreVacuum );
@@ -85,13 +87,12 @@ static Snapshot CurrentTransactionSnapshot;
85
87
86
88
static TransactionId DtmNextXid ;
87
89
static SnapshotData DtmSnapshot = { HeapTupleSatisfiesMVCC };
88
- static TransactionId DtmMinXid ;
89
90
static bool DtmHasGlobalSnapshot ;
90
91
static bool DtmIsGlobalTransaction ;
91
92
static int DtmLocalXidReserve ;
92
93
static int DtmCurcid ;
93
94
static Snapshot DtmLastSnapshot ;
94
- static TransactionManager DtmTM = { DtmGetTransactionStatus , DtmSetTransactionStatus , DtmGetSnapshot , DtmGetNewTransactionId , DtmGetOldestXmin };
95
+ static TransactionManager DtmTM = { DtmGetTransactionStatus , DtmSetTransactionStatus , DtmGetSnapshot , DtmGetNewTransactionId , DtmGetOldestXmin , DtmTransactionIdIsInProgress };
95
96
96
97
97
98
#define XTM_TRACE (fmt , ...)
@@ -182,7 +183,7 @@ static void DtmMergeWithActiveSnapshot(Snapshot dst)
182
183
LWLockAcquire (dtm -> xidLock , LW_EXCLUSIVE );
183
184
for (i = 0 , j = 0 ; i < src -> xcnt ; i ++ ) {
184
185
if (!TransactionIdIsInSnapshot (src -> xip [i ], dst )
185
- && DtmGetTransactionStatus (src -> xip [i ], & lsn ) == TRANSACTION_STATUS_IN_PROGRESS )
186
+ && DtmGetTransactionStatus (src -> xip [i ], & lsn ) == TRANSACTION_STATUS_IN_PROGRESS )
186
187
{
187
188
src -> xip [j ++ ] = src -> xip [i ];
188
189
}
@@ -228,7 +229,9 @@ static void DtmMergeWithGlobalSnapshot(Snapshot dst)
228
229
static TransactionId DtmGetOldestXmin (Relation rel , bool ignoreVacuum )
229
230
{
230
231
TransactionId localXmin = GetOldestLocalXmin (rel , ignoreVacuum );
231
- TransactionId globalXmin = DtmMinXid ;
232
+ TransactionId globalXmin = dtm -> minXid ;
233
+ XTM_INFO ("XTM: DtmGetOldestXmin localXmin=%d, globalXmin=%d\n" , localXmin , globalXmin );
234
+
232
235
if (TransactionIdIsValid (globalXmin )) {
233
236
globalXmin -= vacuum_defer_cleanup_age ;
234
237
if (!TransactionIdIsNormal (globalXmin )) {
@@ -237,14 +240,15 @@ static TransactionId DtmGetOldestXmin(Relation rel, bool ignoreVacuum)
237
240
if (TransactionIdPrecedes (globalXmin , localXmin )) {
238
241
localXmin = globalXmin ;
239
242
}
243
+ XTM_INFO ("XTM: DtmGetOldestXmin adjusted localXmin=%d, globalXmin=%d\n" , localXmin , globalXmin );
240
244
}
241
245
return localXmin ;
242
246
}
243
247
244
248
static void DtmUpdateRecentXmin (Snapshot snapshot )
245
249
{
246
- TransactionId xmin = DtmMinXid ;//DtmSnapshot.xmin;
247
- XTM_INFO ("XTM: DtmUpdateRecentXmin global xmin=%d, snapshot xmin %d\n" , DtmMinXid , DtmSnapshot .xmin );
250
+ TransactionId xmin = dtm -> minXid ;//DtmSnapshot.xmin;
251
+ XTM_INFO ("XTM: DtmUpdateRecentXmin global xmin=%d, snapshot xmin %d\n" , dtm -> minXid , DtmSnapshot .xmin );
248
252
249
253
if (TransactionIdIsValid (xmin )) {
250
254
xmin -= vacuum_defer_cleanup_age ;
@@ -272,6 +276,18 @@ static TransactionId DtmGetNextXid()
272
276
if (TransactionIdIsValid (DtmNextXid )) {
273
277
XTM_INFO ("Use global XID %d\n" , DtmNextXid );
274
278
xid = DtmNextXid ;
279
+
280
+ #ifdef SUPPORT_LOCAL_TRANSACTIONS
281
+ {
282
+ TransactionId * p ;
283
+ p = bsearch (& DtmNextXid , dtm -> activeSnapshot .xip , dtm -> activeSnapshot .xcnt , sizeof (TransactionId ), xidComparator );
284
+ if (p != NULL ) {
285
+ dtm -> activeSnapshot .xcnt -= 1 ;
286
+ memcpy (p , p + 1 , (dtm -> activeSnapshot .xcnt - (p - dtm -> activeSnapshot .xip ))* sizeof (TransactionId ));
287
+ }
288
+ }
289
+ #endif
290
+
275
291
if (TransactionIdPrecedesOrEquals (ShmemVariableCache -> nextXid , xid )) {
276
292
while (TransactionIdPrecedes (ShmemVariableCache -> nextXid , xid )) {
277
293
XTM_INFO ("Extend CLOG for global transaction to %d\n" , ShmemVariableCache -> nextXid );
@@ -307,7 +323,7 @@ static TransactionId DtmGetNextXid()
307
323
return xid ;
308
324
}
309
325
310
- TransactionId
326
+ TransactionId
311
327
DtmGetNewTransactionId (bool isSubXact )
312
328
{
313
329
TransactionId xid ;
@@ -511,11 +527,30 @@ DtmGetNewTransactionId(bool isSubXact)
511
527
}
512
528
513
529
530
+ static bool DtmTransactionIdIsInProgress (TransactionId xid )
531
+ {
532
+ XLogRecPtr lsn ;
533
+ if (TransactionIdIsRunning (xid )) {
534
+ return true;
535
+ }
536
+ #ifdef SUPPORT_LOCAL_TRANSACTIONS
537
+ else if (DtmGetTransactionStatus (xid , & lsn ) == TRANSACTION_STATUS_IN_PROGRESS ) {
538
+ bool globallyStarted ;
539
+ LWLockAcquire (dtm -> xidLock , LW_SHARED );
540
+ globallyStarted = bsearch (& xid , dtm -> activeSnapshot .xip , dtm -> activeSnapshot .xcnt , sizeof (TransactionId ), xidComparator ) != NULL ;
541
+ LWLockRelease (dtm -> xidLock );
542
+ return globallyStarted ;
543
+ }
544
+ #endif
545
+ return false;
546
+ }
547
+
548
+
514
549
static Snapshot DtmGetSnapshot (Snapshot snapshot )
515
550
{
516
551
if (TransactionIdIsValid (DtmNextXid ) /*&& IsMVCCSnapshot(snapshot)*/ && snapshot != & CatalogSnapshotData ) {
517
552
if (!DtmHasGlobalSnapshot && (snapshot != DtmLastSnapshot || DtmCurcid != snapshot -> curcid )) {
518
- DtmGlobalGetSnapshot (DtmNextXid , & DtmSnapshot , & DtmMinXid );
553
+ DtmGlobalGetSnapshot (DtmNextXid , & DtmSnapshot , & dtm -> minXid );
519
554
}
520
555
DtmCurcid = snapshot -> curcid ;
521
556
DtmLastSnapshot = snapshot ;
@@ -526,7 +561,9 @@ static Snapshot DtmGetSnapshot(Snapshot snapshot)
526
561
} else {
527
562
snapshot = GetLocalSnapshotData (snapshot );
528
563
}
564
+ #ifdef SUPPORT_LOCAL_TRANSACTIONS
529
565
DtmMergeWithActiveSnapshot (snapshot );
566
+ #endif
530
567
DtmUpdateRecentXmin (snapshot );
531
568
CurrentTransactionSnapshot = snapshot ;
532
569
return snapshot ;
@@ -598,6 +635,7 @@ static void DtmInitialize()
598
635
dtm -> hashLock = LWLockAssign ();
599
636
dtm -> xidLock = LWLockAssign ();
600
637
dtm -> nReservedXids = 0 ;
638
+ dtm -> minXid = InvalidTransactionId ;
601
639
dtm -> activeSnapshot .xip = (TransactionId * )ShmemAlloc (GetMaxSnapshotXidCount () * sizeof (TransactionId ));
602
640
dtm -> activeSnapshot .subxip = (TransactionId * )ShmemAlloc (GetMaxSnapshotSubxidCount () * sizeof (TransactionId ));
603
641
}
@@ -734,9 +772,9 @@ dtm_begin_transaction(PG_FUNCTION_ARGS)
734
772
int nParticipants = PG_GETARG_INT32 (0 );
735
773
Assert (!TransactionIdIsValid (DtmNextXid ));
736
774
737
- DtmNextXid = DtmGlobalStartTransaction (nParticipants , & DtmSnapshot , & DtmMinXid );
775
+ DtmNextXid = DtmGlobalStartTransaction (nParticipants , & DtmSnapshot , & dtm -> minXid );
738
776
Assert (TransactionIdIsValid (DtmNextXid ));
739
- XTM_INFO ("%d: Start global transaction %d\n" , getpid (), DtmNextXid );
777
+ XTM_INFO ("%d: Start global transaction %d, dtm->minXid=%d \n" , getpid (), DtmNextXid , dtm -> minXid );
740
778
741
779
DtmHasGlobalSnapshot = true;
742
780
DtmIsGlobalTransaction = true;
@@ -750,9 +788,9 @@ Datum dtm_join_transaction(PG_FUNCTION_ARGS)
750
788
Assert (!TransactionIdIsValid (DtmNextXid ));
751
789
DtmNextXid = PG_GETARG_INT32 (0 );
752
790
Assert (TransactionIdIsValid (DtmNextXid ));
753
- XTM_INFO ("%d: Join global transaction %d\n" , getpid (), DtmNextXid );
754
791
755
- DtmGlobalGetSnapshot (DtmNextXid , & DtmSnapshot , & DtmMinXid );
792
+ DtmGlobalGetSnapshot (DtmNextXid , & DtmSnapshot , & dtm -> minXid );
793
+ XTM_INFO ("%d: Join global transaction %d, dtm->minXid=%d\n" , getpid (), DtmNextXid , dtm -> minXid );
756
794
757
795
DtmHasGlobalSnapshot = true;
758
796
DtmIsGlobalTransaction = true;
0 commit comments