@@ -68,6 +68,7 @@ static void DtmInitialize(void);
68
68
static void DtmXactCallback (XactEvent event , void * arg );
69
69
static TransactionId DtmGetNextXid (void );
70
70
static TransactionId DtmGetNewTransactionId (bool isSubXact );
71
+ static TransactionId DtmGetOldestXmin (Relation rel , bool ignoreVacuum );
71
72
72
73
static bool TransactionIdIsInDtmSnapshot (TransactionId xid );
73
74
static bool TransactionIdIsInDoubt (TransactionId xid );
@@ -81,10 +82,11 @@ static Snapshot CurrentTransactionSnapshot;
81
82
82
83
static TransactionId DtmNextXid ;
83
84
static SnapshotData DtmSnapshot = { HeapTupleSatisfiesMVCC };
85
+ static TransactionId DtmMinXid ;
84
86
static bool DtmHasGlobalSnapshot ;
85
87
static bool DtmIsGlobalTransaction ;
86
88
static int DtmLocalXidReserve ;
87
- static TransactionManager DtmTM = { DtmGetTransactionStatus , DtmSetTransactionStatus , DtmGetSnapshot , DtmGetNewTransactionId };
89
+ static TransactionManager DtmTM = { DtmGetTransactionStatus , DtmSetTransactionStatus , DtmGetSnapshot , DtmGetNewTransactionId , DtmGetOldestXmin };
88
90
89
91
90
92
#define XTM_TRACE (fmt , ...)
@@ -163,9 +165,14 @@ static void DtmMergeSnapshots(Snapshot dst, Snapshot src)
163
165
DumpSnapshot (src , "DTM" );
164
166
165
167
/* Merge two snapshots: produce most restrictive snapshots whihc includes running transactions from both of them */
166
- if (src -> xmin < dst -> xmin ) dst -> xmin = src -> xmin ;
168
+ if (src -> xmin < dst -> xmin ) {
169
+ dst -> xmin = src -> xmin ;
170
+ ProcArrayInstallImportedXmin (src -> xmin , DtmNextXid );
171
+ //MyPgXact->xmin = TransactionXmin = src->xmin;
172
+ }
167
173
if (src -> xmax < dst -> xmax ) dst -> xmax = src -> xmax ;
168
174
175
+
169
176
n = dst -> xcnt ;
170
177
for (xid = dst -> xmax ; xid <= src -> xmin ; xid ++ ) {
171
178
dst -> xip [n ++ ] = xid ;
@@ -186,9 +193,20 @@ static void DtmMergeSnapshots(Snapshot dst, Snapshot src)
186
193
DumpSnapshot (dst , "merged" );
187
194
}
188
195
196
+ static TransactionId DtmGetOldestXmin (Relation rel , bool ignoreVacuum )
197
+ {
198
+ TransactionId xmin = GetOldestLocalXmin (rel , ignoreVacuum );
199
+ #if 0
200
+ if (TransactionIdIsValid (DtmSnapshot .xmin ) && TransactionIdPrecedes (DtmSnapshot .xmin , xmin )) {
201
+ xmin = DtmSnapshot .xmin ;
202
+ }
203
+ #endif
204
+ return xmin ;
205
+ }
206
+
189
207
static void DtmUpdateRecentXmin (void )
190
208
{
191
- TransactionId xmin = DtmSnapshot .xmin ;
209
+ TransactionId xmin = DtmMinXid ; // DtmSnapshot.xmin;
192
210
193
211
XTM_TRACE ("XTM: DtmUpdateRecentXmin \n" );
194
212
@@ -462,7 +480,7 @@ static Snapshot DtmGetSnapshot(Snapshot snapshot)
462
480
463
481
if (TransactionIdIsValid (DtmNextXid )) {
464
482
if (!DtmHasGlobalSnapshot ) {
465
- DtmGlobalGetSnapshot (DtmNextXid , & DtmSnapshot );
483
+ DtmGlobalGetSnapshot (DtmNextXid , & DtmSnapshot , & DtmMinXid );
466
484
}
467
485
DtmMergeSnapshots (snapshot , & DtmSnapshot );
468
486
if (!IsolationUsesXactSnapshot ()) {
@@ -675,7 +693,7 @@ dtm_begin_transaction(PG_FUNCTION_ARGS)
675
693
int nParticipants = PG_GETARG_INT32 (0 );
676
694
Assert (!TransactionIdIsValid (DtmNextXid ));
677
695
678
- DtmNextXid = DtmGlobalStartTransaction (nParticipants , & DtmSnapshot );
696
+ DtmNextXid = DtmGlobalStartTransaction (nParticipants , & DtmSnapshot , & DtmMinXid );
679
697
Assert (TransactionIdIsValid (DtmNextXid ));
680
698
XTM_INFO ("%d: Start global transaction %d\n" , getpid (), DtmNextXid );
681
699
0 commit comments