Skip to content

Commit 4c7fa35

Browse files
committed
Add global xmin logic to DTM.
1 parent adbbd19 commit 4c7fa35

File tree

7 files changed

+91
-33
lines changed

7 files changed

+91
-33
lines changed

contrib/pg_xtm/README

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,14 @@ libdtm api
3939
void DtmInitSnapshot(Snapshot snapshot);
4040

4141
// Starts a new global transaction of nParticipants size. Returns the
42-
// transaction id and fills the snapshot on success. Returns INVALID_XID
42+
// transaction id, fills the 'snapshot' and 'gxmin' on success. 'gxmin' is the
43+
// smallest xmin among all snapshots known to DTM. Returns INVALID_XID
4344
// otherwise.
44-
TransactionId DtmGlobalStartTransaction(int nParticipants, Snapshot shapshot);
45+
TransactionId DtmGlobalStartTransaction(int nParticipants, Snapshot snapshot, TransactionId *gxmin);
4546

46-
// Asks the DTM for a fresh snapshot.
47-
void DtmGlobalGetSnapshot(TransactionId xid, Snapshot snapshot);
47+
// Asks the DTM for a fresh snapshot. Fills the 'snapshot' and 'gxmin' on
48+
// success. 'gxmin' is the smallest xmin among all snapshots known to DTM.
49+
void DtmGlobalGetSnapshot(TransactionId xid, Snapshot snapshot, TransactionId *gxmin);
4850

4951
// Commits transaction only once all participants have called this function,
5052
// does not change CLOG otherwise. Set 'wait' to 'true' if you want this call
@@ -135,6 +137,8 @@ The commands:
135137

136138
The DTM replies with '+' followed by a snapshot in the form:
137139

138-
<hex16 xmin><hex16 xmax><hex16 xcnt><hex16 xip[0]>...
140+
<hex16 gxmin><hex16 xmin><hex16 xmax><hex16 xcnt><hex16 xip[0]>...
141+
142+
Where 'gxmin' is the smallest xmin among all available snapshots.
139143

140144
In case of a failure, the DTM replies with '-'.

contrib/pg_xtm/dtmd/src/main.c

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,28 @@ static void gen_snapshot(Transaction *t) {
225225
snapshot_sort(s);
226226
}
227227

228+
static xid_t get_global_xmin() {
229+
int i;
230+
xid_t xmin = MAX_XID;
231+
Transaction *t;
232+
for (i = 0; i < transactions_count; i++) {
233+
t = transactions + i;
234+
if (t->snapshots_count > 0) {
235+
int new = t->snapshots_count - 1 + MAX_SNAPSHOTS_PER_TRANS;
236+
int old = new - MAX_SNAPSHOTS_PER_TRANS + 1;
237+
int si;
238+
for (si = new; si >= old; si--) {
239+
Snapshot *s = t->snapshots + si % MAX_SNAPSHOTS_PER_TRANS;
240+
if (s->xmin < xmin) {
241+
xmin = s->xmin;
242+
}
243+
// minor TODO: Use 'times_sent' to generate a bit greater xmin?
244+
}
245+
}
246+
}
247+
return xmin;
248+
}
249+
228250
static char *onbegin(void *stream, void *clientdata, cmd_t *cmd) {
229251
CHECK(
230252
transactions_count < MAX_TRANSACTIONS,
@@ -270,8 +292,8 @@ static char *onbegin(void *stream, void *clientdata, cmd_t *cmd) {
270292
return strdup("-");
271293
}
272294

273-
char head[1+16+1];
274-
sprintf(head, "+%016llx", t->xid);
295+
char head[1+16+16+1];
296+
sprintf(head, "+%016llx%016llx", t->xid, get_global_xmin());
275297

276298
transactions_count++;
277299

@@ -427,14 +449,18 @@ static char *onsnapshot(void *stream, void *clientdata, cmd_t *cmd) {
427449
gen_snapshot(t);
428450
}
429451

452+
char head[1+16+1];
453+
sprintf(head, "+%016llx", get_global_xmin());
454+
430455
Snapshot *snap = transaction_snapshot(t, CLIENT_SNAPSENT(clientdata)++);
456+
snap->times_sent += 1;
431457
char *snapser = snapshot_serialize(snap);
432458

433459
// FIXME: Remote this assert if you do not have a barrier upon getting
434460
// snapshot in backends. The assert should indicate that situation :)
435461
assert(CLIENT_SNAPSENT(clientdata) == t->snapshots_count);
436462

437-
return destructive_concat(strdup("+"), snapser);
463+
return destructive_concat(strdup(head), snapser);
438464
}
439465

440466
static char *onstatus(void *stream, void *clientdata, cmd_t *cmd) {

contrib/pg_xtm/libdtm.c

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -278,12 +278,14 @@ void DtmInitSnapshot(Snapshot snapshot)
278278
}
279279

280280
// Starts a new global transaction of nParticipants size. Returns the
281-
// transaction id and fills the snapshot on success. Returns INVALID_XID
281+
// transaction id, fills the 'snapshot' and 'gxmin' on success. 'gxmin' is the
282+
// smallest xmin among all snapshots known to DTM. Returns INVALID_XID
282283
// otherwise.
283-
TransactionId DtmGlobalStartTransaction(int nParticipants, Snapshot snapshot)
284+
TransactionId DtmGlobalStartTransaction(int nParticipants, Snapshot snapshot, TransactionId *gxmin)
284285
{
285286
bool ok;
286287
xid_t xid;
288+
xid_t number;
287289
DTMConn dtm = GetConnection();
288290

289291
// query
@@ -293,6 +295,8 @@ TransactionId DtmGlobalStartTransaction(int nParticipants, Snapshot snapshot)
293295
if (!dtm_read_bool(dtm, &ok)) goto failure;
294296
if (!ok) goto failure;
295297
if (!dtm_read_hex16(dtm, &xid)) goto failure;
298+
if (!dtm_read_hex16(dtm, &number)) goto failure;
299+
*gxmin = number;
296300
if (!dtm_read_snapshot(dtm, snapshot)) goto failure;
297301

298302
return xid;
@@ -301,10 +305,12 @@ TransactionId DtmGlobalStartTransaction(int nParticipants, Snapshot snapshot)
301305
return INVALID_XID;
302306
}
303307

304-
// Asks the DTM for a fresh snapshot.
305-
void DtmGlobalGetSnapshot(TransactionId xid, Snapshot snapshot)
308+
// Asks the DTM for a fresh snapshot. Fills the 'snapshot' and 'gxmin' on
309+
// success. 'gxmin' is the smallest xmin among all snapshots known to DTM.
310+
void DtmGlobalGetSnapshot(TransactionId xid, Snapshot snapshot, TransactionId *gxmin)
306311
{
307312
bool ok;
313+
xid_t number;
308314
DTMConn dtm = GetConnection();
309315

310316
assert(snapshot != NULL);
@@ -316,6 +322,8 @@ void DtmGlobalGetSnapshot(TransactionId xid, Snapshot snapshot)
316322
if (!dtm_read_bool(dtm, &ok)) goto failure;
317323
if (!ok) goto failure;
318324

325+
if (!dtm_read_hex16(dtm, &number)) goto failure;
326+
*gxmin = number;
319327
if (!dtm_read_snapshot(dtm, snapshot)) goto failure;
320328
return;
321329
failure:
@@ -330,6 +338,7 @@ void DtmGlobalGetSnapshot(TransactionId xid, Snapshot snapshot)
330338
// Commits transaction only once all participants have called this function,
331339
// does not change CLOG otherwise. Set 'wait' to 'true' if you want this call
332340
// to return only after the transaction is considered finished by the DTM.
341+
// Returns the status on success, or -1 otherwise.
333342
XidStatus DtmGlobalSetTransStatus(TransactionId xid, XidStatus status, bool wait)
334343
{
335344
bool ok;

contrib/pg_xtm/libdtm.h

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,14 @@
1111
void DtmInitSnapshot(Snapshot snapshot);
1212

1313
// Starts a new global transaction of nParticipants size. Returns the
14-
// transaction id and fills the snapshot on success. Returns INVALID_XID
14+
// transaction id, fills the 'snapshot' and 'gxmin' on success. 'gxmin' is the
15+
// smallest xmin among all snapshots known to DTM. Returns INVALID_XID
1516
// otherwise.
16-
TransactionId DtmGlobalStartTransaction(int nParticipants, Snapshot shapshot);
17+
TransactionId DtmGlobalStartTransaction(int nParticipants, Snapshot snapshot, TransactionId *gxmin);
1718

18-
// Asks the DTM for a fresh snapshot.
19-
void DtmGlobalGetSnapshot(TransactionId xid, Snapshot snapshot);
19+
// Asks the DTM for a fresh snapshot. Fills the 'snapshot' and 'gxmin' on
20+
// success. 'gxmin' is the smallest xmin among all snapshots known to DTM.
21+
void DtmGlobalGetSnapshot(TransactionId xid, Snapshot snapshot, TransactionId *gxmin);
2022

2123
// Commits transaction only once all participants have called this function,
2224
// does not change CLOG otherwise. Set 'wait' to 'true' if you want this call

contrib/pg_xtm/pg_dtm.c

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ static Snapshot DtmGetSnapshot(Snapshot snapshot);
6363
static void DtmMergeSnapshots(Snapshot dst, Snapshot src);
6464
static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn);
6565
static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn);
66-
static void DtmUpdateRecentXmin(void);
66+
static void DtmUpdateRecentXmin(TransactionId xmin);
6767
static void DtmInitialize(void);
6868
static void DtmXactCallback(XactEvent event, void *arg);
6969
static TransactionId DtmGetNextXid(void);
@@ -186,10 +186,8 @@ static void DtmMergeSnapshots(Snapshot dst, Snapshot src)
186186
DumpSnapshot(dst, "merged");
187187
}
188188

189-
static void DtmUpdateRecentXmin(void)
189+
static void DtmUpdateRecentXmin(TransactionId xmin)
190190
{
191-
TransactionId xmin = DtmSnapshot.xmin;
192-
193191
XTM_TRACE("XTM: DtmUpdateRecentXmin \n");
194192

195193
if (TransactionIdIsValid(xmin)) {
@@ -207,6 +205,9 @@ static void DtmUpdateRecentXmin(void)
207205
if (TransactionIdFollows(RecentXmin, xmin)) {
208206
RecentXmin = xmin;
209207
}
208+
if (TransactionIdFollows(MyPgXact->xmin, xmin)) {
209+
MyPgXact->xmin = xmin;
210+
}
210211
}
211212
}
212213

@@ -459,19 +460,21 @@ DtmGetNewTransactionId(bool isSubXact)
459460

460461
static Snapshot DtmGetSnapshot(Snapshot snapshot)
461462
{
462-
463463
if (TransactionIdIsValid(DtmNextXid)) {
464464
if (!DtmHasGlobalSnapshot) {
465-
DtmGlobalGetSnapshot(DtmNextXid, &DtmSnapshot);
465+
TransactionId gxmin = InvalidTransactionId;
466+
DtmGlobalGetSnapshot(DtmNextXid, &DtmSnapshot, &gxmin);
467+
if (TransactionIdIsValid(gxmin)) {
468+
DtmUpdateRecentXmin(gxmin);
469+
}
466470
}
467471
DtmMergeSnapshots(snapshot, &DtmSnapshot);
468-
if (!IsolationUsesXactSnapshot()) {
469-
DtmHasGlobalSnapshot = false;
470-
}
472+
if (!IsolationUsesXactSnapshot()) {
473+
DtmHasGlobalSnapshot = false;
474+
}
471475
} else {
472-
snapshot = GetLocalSnapshotData(snapshot);
473-
}
474-
DtmUpdateRecentXmin();
476+
snapshot = GetLocalSnapshotData(snapshot);
477+
}
475478
CurrentTransactionSnapshot = snapshot;
476479
return snapshot;
477480
}
@@ -672,10 +675,16 @@ dtm_get_current_snapshot_xmax(PG_FUNCTION_ARGS)
672675
Datum
673676
dtm_begin_transaction(PG_FUNCTION_ARGS)
674677
{
678+
TransactionId gxmin;
675679
int nParticipants = PG_GETARG_INT32(0);
676680
Assert(!TransactionIdIsValid(DtmNextXid));
677681

678-
DtmNextXid = DtmGlobalStartTransaction(nParticipants, &DtmSnapshot);
682+
gxmin = InvalidTransactionId;
683+
DtmNextXid = DtmGlobalStartTransaction(nParticipants, &DtmSnapshot, &gxmin);
684+
if (TransactionIdIsValid(gxmin)) {
685+
DtmUpdateRecentXmin(gxmin);
686+
}
687+
679688
Assert(TransactionIdIsValid(DtmNextXid));
680689
XTM_INFO("%d: Start global transaction %d\n", getpid(), DtmNextXid);
681690

@@ -687,12 +696,17 @@ dtm_begin_transaction(PG_FUNCTION_ARGS)
687696

688697
Datum dtm_join_transaction(PG_FUNCTION_ARGS)
689698
{
699+
TransactionId gxmin;
690700
Assert(!TransactionIdIsValid(DtmNextXid));
691701
DtmNextXid = PG_GETARG_INT32(0);
692702
Assert(TransactionIdIsValid(DtmNextXid));
693703
XTM_INFO("%d: Join global transaction %d\n", getpid(), DtmNextXid);
694704

695-
DtmGlobalGetSnapshot(DtmNextXid, &DtmSnapshot);
705+
gxmin = InvalidTransactionId;
706+
DtmGlobalGetSnapshot(DtmNextXid, &DtmSnapshot, &gxmin);
707+
if (TransactionIdIsValid(gxmin)) {
708+
DtmUpdateRecentXmin(gxmin);
709+
}
696710

697711
DtmHasGlobalSnapshot = true;
698712
DtmIsGlobalTransaction = true;

src/backend/access/heap/pruneheap.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ heap_page_prune_opt(Relation relation, Buffer buffer)
9898
OldestXmin = RecentGlobalXmin;
9999
else
100100
OldestXmin = RecentGlobalDataXmin;
101-
fprintf(stderr, "PRUNEHEAP: xmin=%d\n", OldestXmin);
101+
fprintf(stderr, "pid=%d PRUNEHEAP: xmin=%d\n", getpid(), OldestXmin);
102102
Assert(TransactionIdIsValid(OldestXmin));
103103

104104
/*

src/backend/utils/time/tqual.c

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1163,8 +1163,11 @@ HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
11631163
{
11641164
bool result = _HeapTupleSatisfiesMVCC(htup, snapshot, buffer);
11651165
HeapTupleHeader tuple = htup->t_data;
1166-
fprintf(stderr, "Transaction %d, [%d,%d) visibility check for tuple {%d,%d) = %d\n",
1167-
GetCurrentTransactionId(), snapshot->xmin, snapshot->xmax, HeapTupleHeaderGetRawXmin(tuple), HeapTupleHeaderGetRawXmax(tuple), result);
1166+
TransactionId curxid = GetCurrentTransactionId();
1167+
if (TransactionIdIsNormal(curxid)) {
1168+
fprintf(stderr, "pid=%d Transaction %d, [%d,%d) visibility check for tuple {%d,%d) = %d\n",
1169+
getpid(), curxid, snapshot->xmin, snapshot->xmax, HeapTupleHeaderGetRawXmin(tuple), HeapTupleHeaderGetRawXmax(tuple), result);
1170+
}
11681171
return result;
11691172
}
11701173
#endif

0 commit comments

Comments
 (0)