@@ -48,6 +48,7 @@ typedef struct
48
48
LWLockId xidLock ;
49
49
TransactionId nextXid ;
50
50
size_t nReservedXids ;
51
+ SnapshotData activeSnapshot ;
51
52
} DtmState ;
52
53
53
54
@@ -61,9 +62,11 @@ void _PG_fini(void);
61
62
62
63
static Snapshot DtmGetSnapshot (Snapshot snapshot );
63
64
static void DtmMergeSnapshots (Snapshot dst , Snapshot src );
65
+ static void DtmMergeWithActiveSnapshot (Snapshot snapshot );
66
+ static void DtmMergeWithGlobalSnapshot (Snapshot snapshot );
64
67
static XidStatus DtmGetTransactionStatus (TransactionId xid , XLogRecPtr * lsn );
65
68
static void DtmSetTransactionStatus (TransactionId xid , int nsubxids , TransactionId * subxids , XidStatus status , XLogRecPtr lsn );
66
- static void DtmUpdateRecentXmin (void );
69
+ static void DtmUpdateRecentXmin (Snapshot snapshot );
67
70
static void DtmInitialize (void );
68
71
static void DtmXactCallback (XactEvent event , void * arg );
69
72
static TransactionId DtmGetNextXid (void );
@@ -144,10 +147,44 @@ static bool TransactionIdIsInDoubt(TransactionId xid)
144
147
return false;
145
148
}
146
149
150
+
147
151
static void DtmMergeSnapshots (Snapshot dst , Snapshot src )
148
152
{
149
153
int i , j , n ;
154
+ TransactionId prev ;
155
+
156
+ if (src -> xmin < dst -> xmin ) {
157
+ dst -> xmin = src -> xmin ;
158
+ }
159
+
160
+ n = dst -> xcnt ;
161
+ Assert (src -> xcnt + n <= GetMaxSnapshotXidCount ());
162
+ memcpy (dst -> xip + n , src -> xip , src -> xcnt * sizeof (TransactionId ));
163
+ n += src -> xcnt ;
164
+
165
+ qsort (dst -> xip , n , sizeof (TransactionId ), xidComparator );
166
+ prev = InvalidTransactionId ;
167
+
168
+ for (i = 0 , j = 0 ; i < n && dst -> xip [i ] < dst -> xmax ; i ++ ) {
169
+ if (dst -> xip [i ] != prev ) {
170
+ dst -> xip [j ++ ] = prev = dst -> xip [i ];
171
+ }
172
+ }
173
+ dst -> xcnt = j ;
174
+ }
175
+
176
+ static void DtmMergeWithActiveSnapshot (Snapshot dst )
177
+ {
178
+ LWLockAcquire (dtm -> xidLock , LW_EXCLUSIVE );
179
+ DtmMergeSnapshots (dst , & dtm -> activeSnapshot );
180
+ LWLockRelease (dtm -> xidLock );
181
+ }
182
+
183
+ static void DtmMergeWithGlobalSnapshot (Snapshot dst )
184
+ {
185
+ int i ;
150
186
TransactionId xid ;
187
+ Snapshot src = & DtmSnapshot ;
151
188
152
189
Assert (TransactionIdIsValid (src -> xmin ) && TransactionIdIsValid (src -> xmax ));
153
190
@@ -166,35 +203,15 @@ static void DtmMergeSnapshots(Snapshot dst, Snapshot src)
166
203
DumpSnapshot (dst , "local" );
167
204
DumpSnapshot (src , "DTM" );
168
205
169
- /* Merge two snapshots: produce most restrictive snapshots whihc includes running transactions from both of them */
170
- if (src -> xmin < dst -> xmin ) {
171
- dst -> xmin = src -> xmin ;
172
- ProcArrayInstallImportedXmin (src -> xmin , DtmNextXid );
173
- //MyPgXact->xmin = TransactionXmin = src->xmin;
174
- }
175
206
if (src -> xmax < dst -> xmax ) dst -> xmax = src -> xmax ;
176
207
177
-
178
- n = dst -> xcnt ;
179
- for (xid = dst -> xmax ; xid <= src -> xmin ; xid ++ ) {
180
- dst -> xip [n ++ ] = xid ;
181
- }
182
- memcpy (dst -> xip + n , src -> xip , src -> xcnt * sizeof (TransactionId ));
183
- n += src -> xcnt ;
184
- Assert (n <= GetMaxSnapshotXidCount ());
185
-
186
- qsort (dst -> xip , n , sizeof (TransactionId ), xidComparator );
187
- xid = InvalidTransactionId ;
208
+ DtmMergeSnapshots (dst , src );
188
209
189
- for (i = 0 , j = 0 ; i < n && dst -> xip [i ] < dst -> xmax ; i ++ ) {
190
- if (dst -> xip [i ] != xid ) {
191
- dst -> xip [j ++ ] = xid = dst -> xip [i ];
192
- }
193
- }
194
- dst -> xcnt = j ;
195
210
DumpSnapshot (dst , "merged" );
196
211
}
197
212
213
+
214
+
198
215
static TransactionId DtmGetOldestXmin (Relation rel , bool ignoreVacuum )
199
216
{
200
217
TransactionId localXmin = GetOldestLocalXmin (rel , ignoreVacuum );
@@ -211,7 +228,7 @@ static TransactionId DtmGetOldestXmin(Relation rel, bool ignoreVacuum)
211
228
return localXmin ;
212
229
}
213
230
214
- static void DtmUpdateRecentXmin (void )
231
+ static void DtmUpdateRecentXmin (Snapshot snapshot )
215
232
{
216
233
TransactionId xmin = DtmMinXid ;//DtmSnapshot.xmin;
217
234
XTM_INFO ("XTM: DtmUpdateRecentXmin global xmin=%d, snapshot xmin %d\n" , DtmMinXid , DtmSnapshot .xmin );
@@ -228,9 +245,10 @@ static void DtmUpdateRecentXmin(void)
228
245
if (TransactionIdFollows (RecentGlobalXmin , xmin )) {
229
246
RecentGlobalXmin = xmin ;
230
247
}
231
- if (TransactionIdFollows (RecentXmin , xmin )) {
232
- RecentXmin = xmin ;
233
- }
248
+ }
249
+ if (TransactionIdFollows (RecentXmin , snapshot -> xmin )) {
250
+ ProcArrayInstallImportedXmin (snapshot -> xmin , GetCurrentTransactionId ());
251
+ RecentXmin = snapshot -> xmin ;
234
252
}
235
253
}
236
254
@@ -253,7 +271,7 @@ static TransactionId DtmGetNextXid()
253
271
}
254
272
} else {
255
273
if (dtm -> nReservedXids == 0 ) {
256
- dtm -> nReservedXids = DtmGlobalReserve (ShmemVariableCache -> nextXid , DtmLocalXidReserve , & dtm -> nextXid );
274
+ dtm -> nReservedXids = DtmGlobalReserve (ShmemVariableCache -> nextXid , DtmLocalXidReserve , & dtm -> nextXid , & dtm -> activeSnapshot );
257
275
Assert (dtm -> nReservedXids > 0 );
258
276
Assert (TransactionIdFollowsOrEquals (dtm -> nextXid , ShmemVariableCache -> nextXid ));
259
277
@@ -488,14 +506,15 @@ static Snapshot DtmGetSnapshot(Snapshot snapshot)
488
506
}
489
507
DtmCurcid = snapshot -> curcid ;
490
508
DtmLastSnapshot = snapshot ;
491
- DtmMergeSnapshots (snapshot , & DtmSnapshot );
509
+ DtmMergeWithGlobalSnapshot (snapshot );
492
510
if (!IsolationUsesXactSnapshot ()) {
493
511
DtmHasGlobalSnapshot = false;
494
512
}
495
513
} else {
496
514
snapshot = GetLocalSnapshotData (snapshot );
497
515
}
498
- DtmUpdateRecentXmin ();
516
+ DtmMergeWithActiveSnapshot (snapshot );
517
+ DtmUpdateRecentXmin (snapshot );
499
518
CurrentTransactionSnapshot = snapshot ;
500
519
return snapshot ;
501
520
}
@@ -566,6 +585,8 @@ static void DtmInitialize()
566
585
dtm -> hashLock = LWLockAssign ();
567
586
dtm -> xidLock = LWLockAssign ();
568
587
dtm -> nReservedXids = 0 ;
588
+ dtm -> activeSnapshot .xip = (TransactionId * )ShmemAlloc (GetMaxSnapshotXidCount () * sizeof (TransactionId ));
589
+ dtm -> activeSnapshot .subxip = (TransactionId * )ShmemAlloc (GetMaxSnapshotSubxidCount () * sizeof (TransactionId ));
569
590
}
570
591
LWLockRelease (AddinShmemInitLock );
571
592
0 commit comments