Skip to content

Commit c3fba52

Browse files
committed
Move DtmMinXid in shared memory
1 parent 16cb19c commit c3fba52

File tree

6 files changed

+62
-16
lines changed

6 files changed

+62
-16
lines changed

contrib/pg_xtm/dtmd/src/main.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -233,14 +233,14 @@ static char *onreserve(void *stream, void *clientdata, cmd_t *cmd) {
233233

234234
static xid_t get_global_xmin() {
235235
int i, j;
236-
xid_t xmin = INVALID_XID;
236+
xid_t xmin = next_gxid;
237237
Transaction *t;
238238
for (i = 0; i < transactions_count; i++) {
239239
t = transactions + i;
240240
j = t->snapshots_count > MAX_SNAPSHOTS_PER_TRANS ? MAX_SNAPSHOTS_PER_TRANS : t->snapshots_count;
241241
while (--j >= 0) {
242242
Snapshot* s = transaction_snapshot(t, j);
243-
if ((xmin == INVALID_XID) || (s->xmin < xmin)) {
243+
if (s->xmin < xmin) {
244244
xmin = s->xmin;
245245
}
246246
// minor TODO: Use 'times_sent' to generate a bit greater xmin?

contrib/pg_xtm/pg_dtm.c

Lines changed: 50 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ typedef struct
4646
{
4747
LWLockId hashLock;
4848
LWLockId xidLock;
49+
TransactionId minXid;
4950
TransactionId nextXid;
5051
size_t nReservedXids;
5152
SnapshotData activeSnapshot;
@@ -69,6 +70,7 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
6970
static void DtmUpdateRecentXmin(Snapshot snapshot);
7071
static void DtmInitialize(void);
7172
static void DtmXactCallback(XactEvent event, void *arg);
73+
static bool DtmTransactionIdIsInProgress(TransactionId xid);
7274
static TransactionId DtmGetNextXid(void);
7375
static TransactionId DtmGetNewTransactionId(bool isSubXact);
7476
static TransactionId DtmGetOldestXmin(Relation rel, bool ignoreVacuum);
@@ -85,13 +87,12 @@ static Snapshot CurrentTransactionSnapshot;
8587

8688
static TransactionId DtmNextXid;
8789
static SnapshotData DtmSnapshot = { HeapTupleSatisfiesMVCC };
88-
static TransactionId DtmMinXid;
8990
static bool DtmHasGlobalSnapshot;
9091
static bool DtmIsGlobalTransaction;
9192
static int DtmLocalXidReserve;
9293
static int DtmCurcid;
9394
static Snapshot DtmLastSnapshot;
94-
static TransactionManager DtmTM = { DtmGetTransactionStatus, DtmSetTransactionStatus, DtmGetSnapshot, DtmGetNewTransactionId, DtmGetOldestXmin };
95+
static TransactionManager DtmTM = { DtmGetTransactionStatus, DtmSetTransactionStatus, DtmGetSnapshot, DtmGetNewTransactionId, DtmGetOldestXmin, DtmTransactionIdIsInProgress };
9596

9697

9798
#define XTM_TRACE(fmt, ...)
@@ -182,7 +183,7 @@ static void DtmMergeWithActiveSnapshot(Snapshot dst)
182183
LWLockAcquire(dtm->xidLock, LW_EXCLUSIVE);
183184
for (i = 0, j = 0; i < src->xcnt; i++) {
184185
if (!TransactionIdIsInSnapshot(src->xip[i], dst)
185-
&& DtmGetTransactionStatus(src->xip[i], &lsn) == TRANSACTION_STATUS_IN_PROGRESS)
186+
&& DtmGetTransactionStatus(src->xip[i], &lsn) == TRANSACTION_STATUS_IN_PROGRESS)
186187
{
187188
src->xip[j++] = src->xip[i];
188189
}
@@ -228,7 +229,9 @@ static void DtmMergeWithGlobalSnapshot(Snapshot dst)
228229
static TransactionId DtmGetOldestXmin(Relation rel, bool ignoreVacuum)
229230
{
230231
TransactionId localXmin = GetOldestLocalXmin(rel, ignoreVacuum);
231-
TransactionId globalXmin = DtmMinXid;
232+
TransactionId globalXmin = dtm->minXid;
233+
XTM_INFO("XTM: DtmGetOldestXmin localXmin=%d, globalXmin=%d\n", localXmin, globalXmin);
234+
232235
if (TransactionIdIsValid(globalXmin)) {
233236
globalXmin -= vacuum_defer_cleanup_age;
234237
if (!TransactionIdIsNormal(globalXmin)) {
@@ -237,14 +240,15 @@ static TransactionId DtmGetOldestXmin(Relation rel, bool ignoreVacuum)
237240
if (TransactionIdPrecedes(globalXmin, localXmin)) {
238241
localXmin = globalXmin;
239242
}
243+
XTM_INFO("XTM: DtmGetOldestXmin adjusted localXmin=%d, globalXmin=%d\n", localXmin, globalXmin);
240244
}
241245
return localXmin;
242246
}
243247

244248
static void DtmUpdateRecentXmin(Snapshot snapshot)
245249
{
246-
TransactionId xmin = DtmMinXid;//DtmSnapshot.xmin;
247-
XTM_INFO("XTM: DtmUpdateRecentXmin global xmin=%d, snapshot xmin %d\n", DtmMinXid, DtmSnapshot.xmin);
250+
TransactionId xmin = dtm->minXid;//DtmSnapshot.xmin;
251+
XTM_INFO("XTM: DtmUpdateRecentXmin global xmin=%d, snapshot xmin %d\n", dtm->minXid, DtmSnapshot.xmin);
248252

249253
if (TransactionIdIsValid(xmin)) {
250254
xmin -= vacuum_defer_cleanup_age;
@@ -272,6 +276,18 @@ static TransactionId DtmGetNextXid()
272276
if (TransactionIdIsValid(DtmNextXid)) {
273277
XTM_INFO("Use global XID %d\n", DtmNextXid);
274278
xid = DtmNextXid;
279+
280+
#ifdef SUPPORT_LOCAL_TRANSACTIONS
281+
{
282+
TransactionId* p;
283+
p = bsearch(&DtmNextXid, dtm->activeSnapshot.xip, dtm->activeSnapshot.xcnt, sizeof(TransactionId), xidComparator);
284+
if (p != NULL) {
285+
dtm->activeSnapshot.xcnt -= 1;
286+
memcpy(p, p+1, (dtm->activeSnapshot.xcnt - (p - dtm->activeSnapshot.xip))*sizeof(TransactionId));
287+
}
288+
}
289+
#endif
290+
275291
if (TransactionIdPrecedesOrEquals(ShmemVariableCache->nextXid, xid)) {
276292
while (TransactionIdPrecedes(ShmemVariableCache->nextXid, xid)) {
277293
XTM_INFO("Extend CLOG for global transaction to %d\n", ShmemVariableCache->nextXid);
@@ -307,7 +323,7 @@ static TransactionId DtmGetNextXid()
307323
return xid;
308324
}
309325

310-
TransactionId
326+
TransactionId
311327
DtmGetNewTransactionId(bool isSubXact)
312328
{
313329
TransactionId xid;
@@ -511,11 +527,30 @@ DtmGetNewTransactionId(bool isSubXact)
511527
}
512528

513529

530+
static bool DtmTransactionIdIsInProgress(TransactionId xid)
531+
{
532+
XLogRecPtr lsn;
533+
if (TransactionIdIsRunning(xid)) {
534+
return true;
535+
}
536+
#ifdef SUPPORT_LOCAL_TRANSACTIONS
537+
else if (DtmGetTransactionStatus(xid, &lsn) == TRANSACTION_STATUS_IN_PROGRESS) {
538+
bool globallyStarted;
539+
LWLockAcquire(dtm->xidLock, LW_SHARED);
540+
globallyStarted = bsearch(&xid, dtm->activeSnapshot.xip, dtm->activeSnapshot.xcnt, sizeof(TransactionId), xidComparator) != NULL;
541+
LWLockRelease(dtm->xidLock);
542+
return globallyStarted;
543+
}
544+
#endif
545+
return false;
546+
}
547+
548+
514549
static Snapshot DtmGetSnapshot(Snapshot snapshot)
515550
{
516551
if (TransactionIdIsValid(DtmNextXid) /*&& IsMVCCSnapshot(snapshot)*/ && snapshot != &CatalogSnapshotData) {
517552
if (!DtmHasGlobalSnapshot && (snapshot != DtmLastSnapshot || DtmCurcid != snapshot->curcid)) {
518-
DtmGlobalGetSnapshot(DtmNextXid, &DtmSnapshot, &DtmMinXid);
553+
DtmGlobalGetSnapshot(DtmNextXid, &DtmSnapshot, &dtm->minXid);
519554
}
520555
DtmCurcid = snapshot->curcid;
521556
DtmLastSnapshot = snapshot;
@@ -526,7 +561,9 @@ static Snapshot DtmGetSnapshot(Snapshot snapshot)
526561
} else {
527562
snapshot = GetLocalSnapshotData(snapshot);
528563
}
564+
#ifdef SUPPORT_LOCAL_TRANSACTIONS
529565
DtmMergeWithActiveSnapshot(snapshot);
566+
#endif
530567
DtmUpdateRecentXmin(snapshot);
531568
CurrentTransactionSnapshot = snapshot;
532569
return snapshot;
@@ -598,6 +635,7 @@ static void DtmInitialize()
598635
dtm->hashLock = LWLockAssign();
599636
dtm->xidLock = LWLockAssign();
600637
dtm->nReservedXids = 0;
638+
dtm->minXid = InvalidTransactionId;
601639
dtm->activeSnapshot.xip = (TransactionId*)ShmemAlloc(GetMaxSnapshotXidCount() * sizeof(TransactionId));
602640
dtm->activeSnapshot.subxip = (TransactionId*)ShmemAlloc(GetMaxSnapshotSubxidCount() * sizeof(TransactionId));
603641
}
@@ -734,9 +772,9 @@ dtm_begin_transaction(PG_FUNCTION_ARGS)
734772
int nParticipants = PG_GETARG_INT32(0);
735773
Assert(!TransactionIdIsValid(DtmNextXid));
736774

737-
DtmNextXid = DtmGlobalStartTransaction(nParticipants, &DtmSnapshot, &DtmMinXid);
775+
DtmNextXid = DtmGlobalStartTransaction(nParticipants, &DtmSnapshot, &dtm->minXid);
738776
Assert(TransactionIdIsValid(DtmNextXid));
739-
XTM_INFO("%d: Start global transaction %d\n", getpid(), DtmNextXid);
777+
XTM_INFO("%d: Start global transaction %d, dtm->minXid=%d\n", getpid(), DtmNextXid, dtm->minXid);
740778

741779
DtmHasGlobalSnapshot = true;
742780
DtmIsGlobalTransaction = true;
@@ -750,9 +788,9 @@ Datum dtm_join_transaction(PG_FUNCTION_ARGS)
750788
Assert(!TransactionIdIsValid(DtmNextXid));
751789
DtmNextXid = PG_GETARG_INT32(0);
752790
Assert(TransactionIdIsValid(DtmNextXid));
753-
XTM_INFO("%d: Join global transaction %d\n", getpid(), DtmNextXid);
754791

755-
DtmGlobalGetSnapshot(DtmNextXid, &DtmSnapshot, &DtmMinXid);
792+
DtmGlobalGetSnapshot(DtmNextXid, &DtmSnapshot, &dtm->minXid);
793+
XTM_INFO("%d: Join global transaction %d, dtm->minXid=%d\n", getpid(), DtmNextXid, dtm->minXid);
756794

757795
DtmHasGlobalSnapshot = true;
758796
DtmIsGlobalTransaction = true;

src/backend/access/transam/clog.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
#include "miscadmin.h"
4545
#include "pg_trace.h"
4646

47-
TransactionManager DefaultTM = { CLOGTransactionIdGetStatus, CLOGTransactionIdSetTreeStatus, GetLocalSnapshotData, GetNewLocalTransactionId, GetOldestLocalXmin };
47+
TransactionManager DefaultTM = { CLOGTransactionIdGetStatus, CLOGTransactionIdSetTreeStatus, GetLocalSnapshotData, GetNewLocalTransactionId, GetOldestLocalXmin, TransactionIdIsRunning };
4848
TransactionManager* TM = &DefaultTM;
4949

5050
/*

src/backend/storage/ipc/procarray.c

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -962,6 +962,12 @@ ProcArrayApplyXidAssignment(TransactionId topxid,
962962
LWLockRelease(ProcArrayLock);
963963
}
964964

965+
bool
966+
TransactionIdIsInProgress(TransactionId xid)
967+
{
968+
return TM->IsInProgress(xid);
969+
}
970+
965971
/*
966972
* TransactionIdIsInProgress -- is given transaction running in some backend
967973
*
@@ -989,7 +995,7 @@ ProcArrayApplyXidAssignment(TransactionId topxid,
989995
* PGXACT again anyway; see GetNewTransactionId).
990996
*/
991997
bool
992-
TransactionIdIsInProgress(TransactionId xid)
998+
TransactionIdIsRunning(TransactionId xid)
993999
{
9941000
static TransactionId *xids = NULL;
9951001
int nxids = 0;

src/include/access/xtm.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ typedef struct
2222
Snapshot (*GetSnapshot)(Snapshot snapshot);
2323
TransactionId (*GetNewTransactionId)(bool isSubXact);
2424
TransactionId (*GetOldestXmin)(Relation rel, bool ignoreVacuum);
25+
bool (*IsInProgress)(TransactionId xid);
2526
} TransactionManager;
2627

2728
extern TransactionManager* TM;

src/include/storage/procarray.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ extern RunningTransactions GetRunningTransactionData(void);
5454

5555
extern bool TransactionIdIsRunning(TransactionId xid);
5656
extern bool TransactionIdIsInProgress(TransactionId xid);
57+
extern bool TransactionIdIsRunning(TransactionId xid);
5758
extern bool TransactionIdIsActive(TransactionId xid);
5859
extern TransactionId GetOldestXmin(Relation rel, bool ignoreVacuum);
5960
extern TransactionId GetOldestLocalXmin(Relation rel, bool ignoreVacuum);

0 commit comments

Comments
 (0)