Skip to content

Commit 99affcb

Browse files
committed
Add active snapshot
1 parent 3ddd266 commit 99affcb

File tree

5 files changed

+58
-37
lines changed

5 files changed

+58
-37
lines changed

contrib/pg_xtm/libdtm.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,6 @@ XidStatus DtmGlobalGetTransStatus(TransactionId xid, bool wait);
3636
// of xids reserved, and sets the 'first' xid accordingly. The number of xids
3737
// reserved is guaranteed to be at least nXids.
3838
// In other words, *first ≥ xid and result ≥ nXids.
39-
int DtmGlobalReserve(TransactionId xid, int nXids, TransactionId *first);
39+
int DtmGlobalReserve(TransactionId xid, int nXids, TransactionId *first, Snapshot active);
4040

4141
#endif

contrib/pg_xtm/pg_dtm.c

Lines changed: 52 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ typedef struct
4848
LWLockId xidLock;
4949
TransactionId nextXid;
5050
size_t nReservedXids;
51+
SnapshotData activeSnapshot;
5152
} DtmState;
5253

5354

@@ -61,9 +62,11 @@ void _PG_fini(void);
6162

6263
static Snapshot DtmGetSnapshot(Snapshot snapshot);
6364
static void DtmMergeSnapshots(Snapshot dst, Snapshot src);
65+
static void DtmMergeWithActiveSnapshot(Snapshot snapshot);
66+
static void DtmMergeWithGlobalSnapshot(Snapshot snapshot);
6467
static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn);
6568
static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn);
66-
static void DtmUpdateRecentXmin(void);
69+
static void DtmUpdateRecentXmin(Snapshot snapshot);
6770
static void DtmInitialize(void);
6871
static void DtmXactCallback(XactEvent event, void *arg);
6972
static TransactionId DtmGetNextXid(void);
@@ -144,10 +147,44 @@ static bool TransactionIdIsInDoubt(TransactionId xid)
144147
return false;
145148
}
146149

150+
147151
static void DtmMergeSnapshots(Snapshot dst, Snapshot src)
148152
{
149153
int i, j, n;
154+
TransactionId prev;
155+
156+
if (src->xmin < dst->xmin) {
157+
dst->xmin = src->xmin;
158+
}
159+
160+
n = dst->xcnt;
161+
Assert(src->xcnt + n <= GetMaxSnapshotXidCount());
162+
memcpy(dst->xip + n, src->xip, src->xcnt*sizeof(TransactionId));
163+
n += src->xcnt;
164+
165+
qsort(dst->xip, n, sizeof(TransactionId), xidComparator);
166+
prev = InvalidTransactionId;
167+
168+
for (i = 0, j = 0; i < n && dst->xip[i] < dst->xmax; i++) {
169+
if (dst->xip[i] != prev) {
170+
dst->xip[j++] = prev = dst->xip[i];
171+
}
172+
}
173+
dst->xcnt = j;
174+
}
175+
176+
static void DtmMergeWithActiveSnapshot(Snapshot dst)
177+
{
178+
LWLockAcquire(dtm->xidLock, LW_EXCLUSIVE);
179+
DtmMergeSnapshots(dst, &dtm->activeSnapshot);
180+
LWLockRelease(dtm->xidLock);
181+
}
182+
183+
static void DtmMergeWithGlobalSnapshot(Snapshot dst)
184+
{
185+
int i;
150186
TransactionId xid;
187+
Snapshot src = &DtmSnapshot;
151188

152189
Assert(TransactionIdIsValid(src->xmin) && TransactionIdIsValid(src->xmax));
153190

@@ -166,35 +203,15 @@ static void DtmMergeSnapshots(Snapshot dst, Snapshot src)
166203
DumpSnapshot(dst, "local");
167204
DumpSnapshot(src, "DTM");
168205

169-
/* Merge two snapshots: produce most restrictive snapshots whihc includes running transactions from both of them */
170-
if (src->xmin < dst->xmin) {
171-
dst->xmin = src->xmin;
172-
ProcArrayInstallImportedXmin(src->xmin, DtmNextXid);
173-
//MyPgXact->xmin = TransactionXmin = src->xmin;
174-
}
175206
if (src->xmax < dst->xmax) dst->xmax = src->xmax;
176207

177-
178-
n = dst->xcnt;
179-
for (xid = dst->xmax; xid <= src->xmin; xid++) {
180-
dst->xip[n++] = xid;
181-
}
182-
memcpy(dst->xip + n, src->xip, src->xcnt*sizeof(TransactionId));
183-
n += src->xcnt;
184-
Assert(n <= GetMaxSnapshotXidCount());
185-
186-
qsort(dst->xip, n, sizeof(TransactionId), xidComparator);
187-
xid = InvalidTransactionId;
208+
DtmMergeSnapshots(dst, src);
188209

189-
for (i = 0, j = 0; i < n && dst->xip[i] < dst->xmax; i++) {
190-
if (dst->xip[i] != xid) {
191-
dst->xip[j++] = xid = dst->xip[i];
192-
}
193-
}
194-
dst->xcnt = j;
195210
DumpSnapshot(dst, "merged");
196211
}
197212

213+
214+
198215
static TransactionId DtmGetOldestXmin(Relation rel, bool ignoreVacuum)
199216
{
200217
TransactionId localXmin = GetOldestLocalXmin(rel, ignoreVacuum);
@@ -211,7 +228,7 @@ static TransactionId DtmGetOldestXmin(Relation rel, bool ignoreVacuum)
211228
return localXmin;
212229
}
213230

214-
static void DtmUpdateRecentXmin(void)
231+
static void DtmUpdateRecentXmin(Snapshot snapshot)
215232
{
216233
TransactionId xmin = DtmMinXid;//DtmSnapshot.xmin;
217234
XTM_INFO("XTM: DtmUpdateRecentXmin global xmin=%d, snapshot xmin %d\n", DtmMinXid, DtmSnapshot.xmin);
@@ -228,9 +245,10 @@ static void DtmUpdateRecentXmin(void)
228245
if (TransactionIdFollows(RecentGlobalXmin, xmin)) {
229246
RecentGlobalXmin = xmin;
230247
}
231-
if (TransactionIdFollows(RecentXmin, xmin)) {
232-
RecentXmin = xmin;
233-
}
248+
}
249+
if (TransactionIdFollows(RecentXmin, snapshot->xmin)) {
250+
ProcArrayInstallImportedXmin(snapshot->xmin, GetCurrentTransactionId());
251+
RecentXmin = snapshot->xmin;
234252
}
235253
}
236254

@@ -253,7 +271,7 @@ static TransactionId DtmGetNextXid()
253271
}
254272
} else {
255273
if (dtm->nReservedXids == 0) {
256-
dtm->nReservedXids = DtmGlobalReserve(ShmemVariableCache->nextXid, DtmLocalXidReserve, &dtm->nextXid);
274+
dtm->nReservedXids = DtmGlobalReserve(ShmemVariableCache->nextXid, DtmLocalXidReserve, &dtm->nextXid, &dtm->activeSnapshot);
257275
Assert(dtm->nReservedXids > 0);
258276
Assert(TransactionIdFollowsOrEquals(dtm->nextXid, ShmemVariableCache->nextXid));
259277

@@ -488,14 +506,15 @@ static Snapshot DtmGetSnapshot(Snapshot snapshot)
488506
}
489507
DtmCurcid = snapshot->curcid;
490508
DtmLastSnapshot = snapshot;
491-
DtmMergeSnapshots(snapshot, &DtmSnapshot);
509+
DtmMergeWithGlobalSnapshot(snapshot);
492510
if (!IsolationUsesXactSnapshot()) {
493511
DtmHasGlobalSnapshot = false;
494512
}
495513
} else {
496514
snapshot = GetLocalSnapshotData(snapshot);
497515
}
498-
DtmUpdateRecentXmin();
516+
DtmMergeWithActiveSnapshot(snapshot);
517+
DtmUpdateRecentXmin(snapshot);
499518
CurrentTransactionSnapshot = snapshot;
500519
return snapshot;
501520
}
@@ -566,6 +585,8 @@ static void DtmInitialize()
566585
dtm->hashLock = LWLockAssign();
567586
dtm->xidLock = LWLockAssign();
568587
dtm->nReservedXids = 0;
588+
dtm->activeSnapshot.xip = (TransactionId*)ShmemAlloc(GetMaxSnapshotXidCount() * sizeof(TransactionId));
589+
dtm->activeSnapshot.subxip = (TransactionId*)ShmemAlloc(GetMaxSnapshotSubxidCount() * sizeof(TransactionId));
569590
}
570591
LWLockRelease(AddinShmemInitLock);
571592

contrib/pg_xtm/tests/transfers.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,8 @@ func transfer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
125125

126126
start := time.Now()
127127
for myCommits < N_ITERATIONS {
128-
amount := 2*rand.Intn(2000) - 1
129-
//amount := 1
128+
//amount := 2*rand.Intn(2000) - 1
129+
amount := 1
130130
account1 := rand.Intn(N_ACCOUNTS)
131131
account2 := rand.Intn(N_ACCOUNTS)
132132

src/backend/access/heap/visibilitymap.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf,
254254
Page page;
255255
char *map;
256256

257-
#if 0
257+
#if 1
258258
fprintf(stderr, "Visibilitymap cutoff %d, RecentLocalDataXmin=%d\n", cutoff_xid, RecentGlobalDataXmin);
259259
// return;
260260
#endif

src/backend/utils/time/tqual.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1165,8 +1165,8 @@ HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
11651165
HeapTupleHeader tuple = htup->t_data;
11661166
TransactionId curxid = GetCurrentTransactionId();
11671167
if (TransactionIdIsNormal(curxid)) {
1168-
fprintf(stderr, "pid=%d Transaction %d, [%d,%d) visibility check for tuple {%d,%d) = %d\n",
1169-
getpid(), curxid, snapshot->xmin, snapshot->xmax, HeapTupleHeaderGetRawXmin(tuple), HeapTupleHeaderGetRawXmax(tuple), result);
1168+
fprintf(stderr, "pid=%d Transaction %d, [%d,%d) visibility check for tuple {%d,%d} %x = %d\n",
1169+
getpid(), curxid, snapshot->xmin, snapshot->xmax, HeapTupleHeaderGetRawXmin(tuple), HeapTupleHeaderGetRawXmax(tuple), tuple->t_infomask, result);
11701170
}
11711171
return result;
11721172
}

0 commit comments

Comments
 (0)