Skip to content

Commit 7db1e23

Browse files
committed
Fisx bug in XMIN calculation in DTMD
2 parents 58f9465 + 4c7fa35 commit 7db1e23

File tree

8 files changed

+84
-41
lines changed

8 files changed

+84
-41
lines changed

contrib/pg_xtm/README

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,14 @@ libdtm api
3939
void DtmInitSnapshot(Snapshot snapshot);
4040

4141
// Starts a new global transaction of nParticipants size. Returns the
42-
// transaction id and fills the snapshot on success. Returns INVALID_XID
42+
// transaction id, fills the 'snapshot' and 'gxmin' on success. 'gxmin' is the
43+
// smallest xmin among all snapshots known to DTM. Returns INVALID_XID
4344
// otherwise.
44-
TransactionId DtmGlobalStartTransaction(int nParticipants, Snapshot shapshot);
45+
TransactionId DtmGlobalStartTransaction(int nParticipants, Snapshot snapshot, TransactionId *gxmin);
4546

46-
// Asks the DTM for a fresh snapshot.
47-
void DtmGlobalGetSnapshot(TransactionId xid, Snapshot snapshot);
47+
// Asks the DTM for a fresh snapshot. Fills the 'snapshot' and 'gxmin' on
48+
// success. 'gxmin' is the smallest xmin among all snapshots known to DTM.
49+
void DtmGlobalGetSnapshot(TransactionId xid, Snapshot snapshot, TransactionId *gxmin);
4850

4951
// Commits transaction only once all participants have called this function,
5052
// does not change CLOG otherwise. Set 'wait' to 'true' if you want this call
@@ -135,6 +137,8 @@ The commands:
135137

136138
The DTM replies with '+' followed by a snapshot in the form:
137139

138-
<hex16 xmin><hex16 xmax><hex16 xcnt><hex16 xip[0]>...
140+
<hex16 gxmin><hex16 xmin><hex16 xmax><hex16 xcnt><hex16 xip[0]>...
141+
142+
Where 'gxmin' is the smallest xmin among all available snapshots.
139143

140144
In case of a failure, the DTM replies with '-'.

contrib/pg_xtm/dtmd/src/main.c

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,24 @@ static void gen_snapshot(Transaction *t) {
225225
snapshot_sort(s);
226226
}
227227

228+
static xid_t get_global_xmin() {
229+
int i, j;
230+
xid_t xmin = MAX_XID;
231+
Transaction *t;
232+
for (i = 0; i < transactions_count; i++) {
233+
t = transactions + i;
234+
j = t->snapshots_count > MAX_SNAPSHOTS_PER_TRANS ? MAX_SNAPSHOTS_PER_TRANS : t->snapshots_count;
235+
while (--j >= 0) {
236+
Snapshot* s = transaction_snapshot(t, j);
237+
if (s->xmin < xmin) {
238+
xmin = s->xmin;
239+
}
240+
// minor TODO: Use 'times_sent' to generate a bit greater xmin?
241+
}
242+
}
243+
return xmin;
244+
}
245+
228246
static char *onbegin(void *stream, void *clientdata, cmd_t *cmd) {
229247
CHECK(
230248
transactions_count < MAX_TRANSACTIONS,
@@ -270,8 +288,8 @@ static char *onbegin(void *stream, void *clientdata, cmd_t *cmd) {
270288
return strdup("-");
271289
}
272290

273-
char head[1+16+1];
274-
sprintf(head, "+%016llx", t->xid);
291+
char head[1+16+16+1];
292+
sprintf(head, "+%016llx%016llx", t->xid, get_global_xmin());
275293

276294
transactions_count++;
277295

@@ -427,14 +445,18 @@ static char *onsnapshot(void *stream, void *clientdata, cmd_t *cmd) {
427445
gen_snapshot(t);
428446
}
429447

448+
char head[1+16+1];
449+
sprintf(head, "+%016llx", get_global_xmin());
450+
430451
Snapshot *snap = transaction_snapshot(t, CLIENT_SNAPSENT(clientdata)++);
452+
snap->times_sent += 1;
431453
char *snapser = snapshot_serialize(snap);
432454

433455
// FIXME: Remote this assert if you do not have a barrier upon getting
434456
// snapshot in backends. The assert should indicate that situation :)
435457
assert(CLIENT_SNAPSENT(clientdata) == t->snapshots_count);
436458

437-
return destructive_concat(strdup("+"), snapser);
459+
return destructive_concat(strdup(head), snapser);
438460
}
439461

440462
static char *onstatus(void *stream, void *clientdata, cmd_t *cmd) {

contrib/pg_xtm/libdtm.c

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -278,12 +278,14 @@ void DtmInitSnapshot(Snapshot snapshot)
278278
}
279279

280280
// Starts a new global transaction of nParticipants size. Returns the
281-
// transaction id and fills the snapshot on success. Returns INVALID_XID
281+
// transaction id, fills the 'snapshot' and 'gxmin' on success. 'gxmin' is the
282+
// smallest xmin among all snapshots known to DTM. Returns INVALID_XID
282283
// otherwise.
283-
TransactionId DtmGlobalStartTransaction(int nParticipants, Snapshot snapshot)
284+
TransactionId DtmGlobalStartTransaction(int nParticipants, Snapshot snapshot, TransactionId *gxmin)
284285
{
285286
bool ok;
286287
xid_t xid;
288+
xid_t number;
287289
DTMConn dtm = GetConnection();
288290

289291
// query
@@ -293,6 +295,8 @@ TransactionId DtmGlobalStartTransaction(int nParticipants, Snapshot snapshot)
293295
if (!dtm_read_bool(dtm, &ok)) goto failure;
294296
if (!ok) goto failure;
295297
if (!dtm_read_hex16(dtm, &xid)) goto failure;
298+
if (!dtm_read_hex16(dtm, &number)) goto failure;
299+
*gxmin = number;
296300
if (!dtm_read_snapshot(dtm, snapshot)) goto failure;
297301

298302
return xid;
@@ -301,10 +305,12 @@ TransactionId DtmGlobalStartTransaction(int nParticipants, Snapshot snapshot)
301305
return INVALID_XID;
302306
}
303307

304-
// Asks the DTM for a fresh snapshot.
305-
void DtmGlobalGetSnapshot(TransactionId xid, Snapshot snapshot)
308+
// Asks the DTM for a fresh snapshot. Fills the 'snapshot' and 'gxmin' on
309+
// success. 'gxmin' is the smallest xmin among all snapshots known to DTM.
310+
void DtmGlobalGetSnapshot(TransactionId xid, Snapshot snapshot, TransactionId *gxmin)
306311
{
307312
bool ok;
313+
xid_t number;
308314
DTMConn dtm = GetConnection();
309315

310316
assert(snapshot != NULL);
@@ -316,6 +322,8 @@ void DtmGlobalGetSnapshot(TransactionId xid, Snapshot snapshot)
316322
if (!dtm_read_bool(dtm, &ok)) goto failure;
317323
if (!ok) goto failure;
318324

325+
if (!dtm_read_hex16(dtm, &number)) goto failure;
326+
*gxmin = number;
319327
if (!dtm_read_snapshot(dtm, snapshot)) goto failure;
320328
return;
321329
failure:
@@ -330,6 +338,7 @@ void DtmGlobalGetSnapshot(TransactionId xid, Snapshot snapshot)
330338
// Commits transaction only once all participants have called this function,
331339
// does not change CLOG otherwise. Set 'wait' to 'true' if you want this call
332340
// to return only after the transaction is considered finished by the DTM.
341+
// Returns the status on success, or -1 otherwise.
333342
XidStatus DtmGlobalSetTransStatus(TransactionId xid, XidStatus status, bool wait)
334343
{
335344
bool ok;

contrib/pg_xtm/libdtm.h

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,14 @@
1111
void DtmInitSnapshot(Snapshot snapshot);
1212

1313
// Starts a new global transaction of nParticipants size. Returns the
14-
// transaction id and fills the snapshot on success. Returns INVALID_XID
14+
// transaction id, fills the 'snapshot' and 'gxmin' on success. 'gxmin' is the
15+
// smallest xmin among all snapshots known to DTM. Returns INVALID_XID
1516
// otherwise.
16-
TransactionId DtmGlobalStartTransaction(int nParticipants, Snapshot shapshot);
17+
TransactionId DtmGlobalStartTransaction(int nParticipants, Snapshot snapshot, TransactionId *gxmin);
1718

18-
// Asks the DTM for a fresh snapshot.
19-
void DtmGlobalGetSnapshot(TransactionId xid, Snapshot snapshot);
19+
// Asks the DTM for a fresh snapshot. Fills the 'snapshot' and 'gxmin' on
20+
// success. 'gxmin' is the smallest xmin among all snapshots known to DTM.
21+
void DtmGlobalGetSnapshot(TransactionId xid, Snapshot snapshot, TransactionId *gxmin);
2022

2123
// Commits transaction only once all participants have called this function,
2224
// does not change CLOG otherwise. Set 'wait' to 'true' if you want this call

contrib/pg_xtm/pg_dtm.c

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,8 @@ static TransactionManager DtmTM = { DtmGetTransactionStatus, DtmSetTransactionSt
9090

9191

9292
#define XTM_TRACE(fmt, ...)
93-
#define XTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
94-
//#define XTM_INFO(fmt, ...)
93+
//#define XTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
94+
#define XTM_INFO(fmt, ...)
9595

9696
static void DumpSnapshot(Snapshot s, char *name)
9797
{
@@ -195,20 +195,24 @@ static void DtmMergeSnapshots(Snapshot dst, Snapshot src)
195195

196196
static TransactionId DtmGetOldestXmin(Relation rel, bool ignoreVacuum)
197197
{
198-
TransactionId xmin = GetOldestLocalXmin(rel, ignoreVacuum);
199-
#if 0
200-
if (TransactionIdIsValid(DtmSnapshot.xmin) && TransactionIdPrecedes(DtmSnapshot.xmin, xmin)) {
201-
xmin = DtmSnapshot.xmin;
198+
TransactionId localXmin = GetOldestLocalXmin(rel, ignoreVacuum);
199+
TransactionId globalXmin = DtmMinXid;
200+
if (TransactionIdIsValid(globalXmin)) {
201+
globalXmin -= vacuum_defer_cleanup_age;
202+
if (!TransactionIdIsNormal(globalXmin)) {
203+
globalXmin = FirstNormalTransactionId;
204+
}
205+
if (TransactionIdPrecedes(globalXmin, localXmin)) {
206+
localXmin = globalXmin;
207+
}
202208
}
203-
#endif
204-
return xmin;
209+
return localXmin;
205210
}
206211

207212
static void DtmUpdateRecentXmin(void)
208213
{
209214
TransactionId xmin = DtmMinXid;//DtmSnapshot.xmin;
210-
211-
XTM_TRACE("XTM: DtmUpdateRecentXmin \n");
215+
XTM_INFO("XTM: DtmUpdateRecentXmin global xmin=%d, snapshot xmin %d\n", DtmMinXid, DtmSnapshot.xmin);
212216

213217
if (TransactionIdIsValid(xmin)) {
214218
xmin -= vacuum_defer_cleanup_age;
@@ -394,7 +398,6 @@ DtmGetNewTransactionId(bool isSubXact)
394398
* Extend pg_subtrans and pg_commit_ts too.
395399
*/
396400
if (TransactionIdFollowsOrEquals(xid, ShmemVariableCache->nextXid)) {
397-
fprintf(stderr, "Extend CLOG to %d\n", xid);
398401
ExtendCLOG(xid);
399402
ExtendCommitTs(xid);
400403
ExtendSUBTRANS(xid);
@@ -477,18 +480,17 @@ DtmGetNewTransactionId(bool isSubXact)
477480

478481
static Snapshot DtmGetSnapshot(Snapshot snapshot)
479482
{
480-
481483
if (TransactionIdIsValid(DtmNextXid)) {
482484
if (!DtmHasGlobalSnapshot) {
483485
DtmGlobalGetSnapshot(DtmNextXid, &DtmSnapshot, &DtmMinXid);
484486
}
485487
DtmMergeSnapshots(snapshot, &DtmSnapshot);
486-
if (!IsolationUsesXactSnapshot()) {
487-
DtmHasGlobalSnapshot = false;
488-
}
488+
if (!IsolationUsesXactSnapshot()) {
489+
DtmHasGlobalSnapshot = false;
490+
}
489491
} else {
490-
snapshot = GetLocalSnapshotData(snapshot);
491-
}
492+
snapshot = GetLocalSnapshotData(snapshot);
493+
}
492494
DtmUpdateRecentXmin();
493495
CurrentTransactionSnapshot = snapshot;
494496
return snapshot;
@@ -710,7 +712,7 @@ Datum dtm_join_transaction(PG_FUNCTION_ARGS)
710712
Assert(TransactionIdIsValid(DtmNextXid));
711713
XTM_INFO("%d: Join global transaction %d\n", getpid(), DtmNextXid);
712714

713-
DtmGlobalGetSnapshot(DtmNextXid, &DtmSnapshot);
715+
DtmGlobalGetSnapshot(DtmNextXid, &DtmSnapshot, &DtmMinXid);
714716

715717
DtmHasGlobalSnapshot = true;
716718
DtmIsGlobalTransaction = true;

src/backend/access/heap/pruneheap.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ heap_page_prune_opt(Relation relation, Buffer buffer)
9898
OldestXmin = RecentGlobalXmin;
9999
else
100100
OldestXmin = RecentGlobalDataXmin;
101-
fprintf(stderr, "PRUNEHEAP: xmin=%d\n", OldestXmin);
101+
//fprintf(stderr, "pid=%d PRUNEHEAP: xmin=%d\n", getpid(), OldestXmin);
102102
Assert(TransactionIdIsValid(OldestXmin));
103103

104104
/*

src/backend/access/heap/visibilitymap.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -254,9 +254,10 @@ visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf,
254254
Page page;
255255
char *map;
256256

257+
#if 0
257258
fprintf(stderr, "Visibilitymap cutoff %d, RecentLocalDataXmin=%d\n", cutoff_xid, RecentGlobalDataXmin);
258-
return;
259-
259+
// return;
260+
#endif
260261
#ifdef TRACE_VISIBILITYMAP
261262
elog(DEBUG1, "vm_set %s %d", RelationGetRelationName(rel), heapBlk);
262263
#endif

src/backend/utils/time/tqual.c

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -970,7 +970,7 @@ HeapTupleSatisfiesDirty(HeapTuple htup, Snapshot snapshot,
970970
* and more contention on the PGXACT array.
971971
*/
972972
bool
973-
_HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
973+
HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
974974
Buffer buffer)
975975
{
976976
HeapTupleHeader tuple = htup->t_data;
@@ -1156,15 +1156,18 @@ _HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
11561156

11571157
return false;
11581158
}
1159-
#if 1
1159+
#if 0
11601160
bool
11611161
HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
11621162
Buffer buffer)
11631163
{
11641164
bool result = _HeapTupleSatisfiesMVCC(htup, snapshot, buffer);
11651165
HeapTupleHeader tuple = htup->t_data;
1166-
fprintf(stderr, "Transaction %d, [%d,%d) visibility check for tuple {%d,%d) = %d\n",
1167-
GetCurrentTransactionId(), snapshot->xmin, snapshot->xmax, HeapTupleHeaderGetRawXmin(tuple), HeapTupleHeaderGetRawXmax(tuple), result);
1166+
TransactionId curxid = GetCurrentTransactionId();
1167+
if (TransactionIdIsNormal(curxid)) {
1168+
fprintf(stderr, "pid=%d Transaction %d, [%d,%d) visibility check for tuple {%d,%d) = %d\n",
1169+
getpid(), curxid, snapshot->xmin, snapshot->xmax, HeapTupleHeaderGetRawXmin(tuple), HeapTupleHeaderGetRawXmax(tuple), result);
1170+
}
11681171
return result;
11691172
}
11701173
#endif

0 commit comments

Comments
 (0)