Skip to content

Commit 473f388

Browse files
committed
Move GetNewTransactionId to XTM API
1 parent d7c5088 commit 473f388

File tree

7 files changed

+36
-213
lines changed

7 files changed

+36
-213
lines changed

contrib/pg_xtm/dtmd/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
CC=gcc
2-
CFLAGS=-g -Wall -Iinclude -D_LARGEFILE64_SOURCE
2+
CFLAGS=-g -Wall -Iinclude -D_LARGEFILE64_SOURCE -DDEBUG
33
LIBUV_PREFIX=$(HOME)/libuv-build
44
LIBUV_CFLAGS=-I"$(LIBUV_PREFIX)/include" -L"$(LIBUV_PREFIX)/lib"
55
LIBUV_LDFLAGS=-luv -pthread

contrib/pg_xtm/dtmd/include/clogfile.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
#define BITS_PER_COMMIT 2
1212
#define COMMIT_MASK ((1 << BITS_PER_COMMIT) - 1)
1313
#define COMMITS_PER_BYTE 4
14-
#define COMMITS_PER_FILE 0x100000000
14+
#define COMMITS_PER_FILE 0x10000000
1515
#define BYTES_PER_FILE ((COMMITS_PER_FILE) / (COMMITS_PER_BYTE))
1616
#define XID_TO_FILEID(XID) ((XID) / (COMMITS_PER_FILE))
1717
#define XID_TO_OFFSET(XID) (((XID) % (COMMITS_PER_FILE)) / (COMMITS_PER_BYTE))

contrib/pg_xtm/pg_dtm.c

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
#include "access/xact.h"
2222
#include "access/xtm.h"
2323
#include "access/transam.h"
24+
#include "access/subtrans.h"
25+
#include "access/commit_ts.h"
2426
#include "access/xlog.h"
2527
#include "storage/proc.h"
2628
#include "storage/procarray.h"
@@ -109,6 +111,7 @@ static bool TransactionIdIsInDtmSnapshot(TransactionId xid)
109111
|| bsearch(&xid, DtmSnapshot.xip, DtmSnapshot.xcnt, sizeof(TransactionId), xidComparator) != NULL;
110112
}
111113

114+
112115
static bool TransactionIdIsInDoubt(TransactionId xid)
113116
{
114117
bool inDoubt;
@@ -118,8 +121,8 @@ static bool TransactionIdIsInDoubt(TransactionId xid)
118121
inDoubt = hash_search(xid_in_doubt, &xid, HASH_FIND, NULL) != NULL;
119122
LWLockRelease(dtm->hashLock);
120123
if (!inDoubt) {
121-
XLogRecPtr lsn;
122-
inDoubt = CLOGTransactionIdGetStatus(xid, &lsn) != TRANSACTION_STATUS_IN_PROGRESS;
124+
XLogRecPtr lsn;
125+
inDoubt = DtmGetTransactionStatus(xid, &lsn) != TRANSACTION_STATUS_IN_PROGRESS;
123126
}
124127
if (inDoubt) {
125128
XTM_INFO("Wait for transaction %d to complete\n", xid);
@@ -187,6 +190,7 @@ static void DtmUpdateRecentXmin(void)
187190

188191
if (TransactionIdIsValid(xmin)) {
189192
xmin -= vacuum_defer_cleanup_age;
193+
xmin = FirstNormalTransactionId;
190194
if (!TransactionIdIsNormal(xmin)) {
191195
xmin = FirstNormalTransactionId;
192196
}
@@ -206,16 +210,30 @@ static TransactionId DtmGetNextXid()
206210
if (TransactionIdIsValid(DtmNextXid)) {
207211
XTM_INFO("Use global XID %d\n", DtmNextXid);
208212
xid = DtmNextXid;
209-
if (ShmemVariableCache->nextXid <= xid) {
213+
if (TransactionIdPrecedesOrEquals(ShmemVariableCache->nextXid, xid)) {
214+
while (TransactionIdPrecedes(ShmemVariableCache->nextXid, xid)) {
215+
XTM_INFO("Extend CLOG for global transaction to %d\n", ShmemVariableCache->nextXid);
216+
ExtendCLOG(ShmemVariableCache->nextXid);
217+
ExtendCommitTs(ShmemVariableCache->nextXid);
218+
ExtendSUBTRANS(ShmemVariableCache->nextXid);
219+
TransactionIdAdvance(ShmemVariableCache->nextXid);
220+
}
210221
dtm->nReservedXids = 0;
211-
ShmemVariableCache->nextXid = xid;
212222
}
213223
} else {
214224
if (dtm->nReservedXids == 0) {
215225
dtm->nReservedXids = DtmGlobalReserve(ShmemVariableCache->nextXid, DtmLocalXidReserve, &dtm->nextXid);
216226
Assert(dtm->nReservedXids > 0);
217227
Assert(TransactionIdFollowsOrEquals(dtm->nextXid, ShmemVariableCache->nextXid));
218-
ShmemVariableCache->nextXid = dtm->nextXid;
228+
229+
while (TransactionIdPrecedes(ShmemVariableCache->nextXid, dtm->nextXid)) {
230+
XTM_INFO("Extend CLOG for local transaction to %d\n", ShmemVariableCache->nextXid);
231+
ExtendCLOG(ShmemVariableCache->nextXid);
232+
ExtendCommitTs(ShmemVariableCache->nextXid);
233+
ExtendSUBTRANS(ShmemVariableCache->nextXid);
234+
TransactionIdAdvance(ShmemVariableCache->nextXid);
235+
}
236+
Assert(ShmemVariableCache->nextXid == dtm->nextXid);
219237
} else {
220238
Assert(ShmemVariableCache->nextXid == dtm->nextXid);
221239
}
@@ -248,8 +266,10 @@ static Snapshot DtmGetSnapshot(Snapshot snapshot)
248266

249267
static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn)
250268
{
251-
XidStatus status = CLOGTransactionIdGetStatus(xid, lsn);
252-
XTM_TRACE("XTM: DtmGetTransactionStatus \n");
269+
XidStatus status = xid >= ShmemVariableCache->nextXid
270+
? TRANSACTION_STATUS_IN_PROGRESS
271+
: CLOGTransactionIdGetStatus(xid, lsn);
272+
XTM_TRACE("XTM: DtmGetTransactionStatus\n");
253273
return status;
254274
}
255275

src/backend/access/transam/clog.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
#include "miscadmin.h"
4545
#include "pg_trace.h"
4646

47-
TransactionManager DefaultTM = { CLOGTransactionIdGetStatus, CLOGTransactionIdSetTreeStatus, GetLocalSnapshotData, GetNextTransactionId };
47+
TransactionManager DefaultTM = { CLOGTransactionIdGetStatus, CLOGTransactionIdSetTreeStatus, GetLocalSnapshotData, GetNewLocalTransactionId };
4848
TransactionManager* TM = &DefaultTM;
4949

5050
/*

src/backend/access/transam/varsup.c

Lines changed: 4 additions & 201 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,10 @@
3434
/* pointer to "variable cache" in shared memory (set up by shmem.c) */
3535
VariableCache ShmemVariableCache = NULL;
3636

37-
TransactionId GetNextTransactionId()
37+
TransactionId
38+
GetNewTransactionId(bool isSubXact)
3839
{
39-
return ShmemVariableCache->nextXid;
40+
return TM->GetNewTransactionId(isSubXactShmemVariableCache->nextXid;
4041
}
4142

4243
/*
@@ -50,206 +51,8 @@ TransactionId GetNextTransactionId()
5051
* issue a warning about XID wrap.
5152
*/
5253
TransactionId
53-
GetNewTransactionId(bool isSubXact)
54+
GetNewLocalTransactionId(bool isSubXact)
5455
{
55-
TransactionId xid;
56-
57-
/*
58-
* Workers synchronize transaction state at the beginning of each parallel
59-
* operation, so we can't account for new XIDs after that point.
60-
*/
61-
if (IsInParallelMode())
62-
elog(ERROR, "cannot assign TransactionIds during a parallel operation");
63-
64-
/*
65-
* During bootstrap initialization, we return the special bootstrap
66-
* transaction id.
67-
*/
68-
if (IsBootstrapProcessingMode())
69-
{
70-
Assert(!isSubXact);
71-
MyPgXact->xid = BootstrapTransactionId;
72-
return BootstrapTransactionId;
73-
}
74-
75-
/* safety check, we should never get this far in a HS slave */
76-
if (RecoveryInProgress())
77-
elog(ERROR, "cannot assign TransactionIds during recovery");
78-
79-
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
80-
81-
xid = TM->GetNextXid();
82-
83-
/*----------
84-
* Check to see if it's safe to assign another XID. This protects against
85-
* catastrophic data loss due to XID wraparound. The basic rules are:
86-
*
87-
* If we're past xidVacLimit, start trying to force autovacuum cycles.
88-
* If we're past xidWarnLimit, start issuing warnings.
89-
* If we're past xidStopLimit, refuse to execute transactions, unless
90-
* we are running in single-user mode (which gives an escape hatch
91-
* to the DBA who somehow got past the earlier defenses).
92-
*
93-
* Note that this coding also appears in GetNewMultiXactId.
94-
*----------
95-
*/
96-
if (TransactionIdFollowsOrEquals(xid, ShmemVariableCache->xidVacLimit))
97-
{
98-
/*
99-
* For safety's sake, we release XidGenLock while sending signals,
100-
* warnings, etc. This is not so much because we care about
101-
* preserving concurrency in this situation, as to avoid any
102-
* possibility of deadlock while doing get_database_name(). First,
103-
* copy all the shared values we'll need in this path.
104-
*/
105-
TransactionId xidWarnLimit = ShmemVariableCache->xidWarnLimit;
106-
TransactionId xidStopLimit = ShmemVariableCache->xidStopLimit;
107-
TransactionId xidWrapLimit = ShmemVariableCache->xidWrapLimit;
108-
Oid oldest_datoid = ShmemVariableCache->oldestXidDB;
109-
110-
LWLockRelease(XidGenLock);
111-
112-
/*
113-
* To avoid swamping the postmaster with signals, we issue the autovac
114-
* request only once per 64K transaction starts. This still gives
115-
* plenty of chances before we get into real trouble.
116-
*/
117-
if (IsUnderPostmaster && (xid % 65536) == 0)
118-
SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_LAUNCHER);
119-
120-
if (IsUnderPostmaster &&
121-
TransactionIdFollowsOrEquals(xid, xidStopLimit))
122-
{
123-
char *oldest_datname = get_database_name(oldest_datoid);
124-
125-
/* complain even if that DB has disappeared */
126-
if (oldest_datname)
127-
ereport(ERROR,
128-
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
129-
errmsg("database is not accepting commands to avoid wraparound data loss in database \"%s\"",
130-
oldest_datname),
131-
errhint("Stop the postmaster and vacuum that database in single-user mode.\n"
132-
"You might also need to commit or roll back old prepared transactions.")));
133-
else
134-
ereport(ERROR,
135-
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
136-
errmsg("database is not accepting commands to avoid wraparound data loss in database with OID %u",
137-
oldest_datoid),
138-
errhint("Stop the postmaster and vacuum that database in single-user mode.\n"
139-
"You might also need to commit or roll back old prepared transactions.")));
140-
}
141-
else if (TransactionIdFollowsOrEquals(xid, xidWarnLimit))
142-
{
143-
char *oldest_datname = get_database_name(oldest_datoid);
144-
145-
/* complain even if that DB has disappeared */
146-
if (oldest_datname)
147-
ereport(WARNING,
148-
(errmsg("database \"%s\" must be vacuumed within %u transactions",
149-
oldest_datname,
150-
xidWrapLimit - xid),
151-
errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n"
152-
"You might also need to commit or roll back old prepared transactions.")));
153-
else
154-
ereport(WARNING,
155-
(errmsg("database with OID %u must be vacuumed within %u transactions",
156-
oldest_datoid,
157-
xidWrapLimit - xid),
158-
errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n"
159-
"You might also need to commit or roll back old prepared transactions.")));
160-
}
161-
162-
/* Re-acquire lock and start over */
163-
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
164-
xid = TM->GetNextXid();
165-
}
166-
167-
/*
168-
* If we are allocating the first XID of a new page of the commit log,
169-
* zero out that commit-log page before returning. We must do this while
170-
* holding XidGenLock, else another xact could acquire and commit a later
171-
* XID before we zero the page. Fortunately, a page of the commit log
172-
* holds 32K or more transactions, so we don't have to do this very often.
173-
*
174-
* Extend pg_subtrans and pg_commit_ts too.
175-
*/
176-
ExtendCLOG(xid);
177-
ExtendCommitTs(xid);
178-
ExtendSUBTRANS(xid);
179-
180-
/*
181-
* Now advance the nextXid counter. This must not happen until after we
182-
* have successfully completed ExtendCLOG() --- if that routine fails, we
183-
* want the next incoming transaction to try it again. We cannot assign
184-
* more XIDs until there is CLOG space for them.
185-
*/
186-
if (xid == ShmemVariableCache->nextXid) {
187-
TransactionIdAdvance(ShmemVariableCache->nextXid);
188-
} else {
189-
Assert(TransactionIdPrecedes(xid, ShmemVariableCache->nextXid));
190-
}
191-
192-
/*
193-
* We must store the new XID into the shared ProcArray before releasing
194-
* XidGenLock. This ensures that every active XID older than
195-
* latestCompletedXid is present in the ProcArray, which is essential for
196-
* correct OldestXmin tracking; see src/backend/access/transam/README.
197-
*
198-
* XXX by storing xid into MyPgXact without acquiring ProcArrayLock, we
199-
* are relying on fetch/store of an xid to be atomic, else other backends
200-
* might see a partially-set xid here. But holding both locks at once
201-
* would be a nasty concurrency hit. So for now, assume atomicity.
202-
*
203-
* Note that readers of PGXACT xid fields should be careful to fetch the
204-
* value only once, rather than assume they can read a value multiple
205-
* times and get the same answer each time.
206-
*
207-
* The same comments apply to the subxact xid count and overflow fields.
208-
*
209-
* A solution to the atomic-store problem would be to give each PGXACT its
210-
* own spinlock used only for fetching/storing that PGXACT's xid and
211-
* related fields.
212-
*
213-
* If there's no room to fit a subtransaction XID into PGPROC, set the
214-
* cache-overflowed flag instead. This forces readers to look in
215-
* pg_subtrans to map subtransaction XIDs up to top-level XIDs. There is a
216-
* race-condition window, in that the new XID will not appear as running
217-
* until its parent link has been placed into pg_subtrans. However, that
218-
* will happen before anyone could possibly have a reason to inquire about
219-
* the status of the XID, so it seems OK. (Snapshots taken during this
220-
* window *will* include the parent XID, so they will deliver the correct
221-
* answer later on when someone does have a reason to inquire.)
222-
*/
223-
{
224-
/*
225-
* Use volatile pointer to prevent code rearrangement; other backends
226-
* could be examining my subxids info concurrently, and we don't want
227-
* them to see an invalid intermediate state, such as incrementing
228-
* nxids before filling the array entry. Note we are assuming that
229-
* TransactionId and int fetch/store are atomic.
230-
*/
231-
volatile PGPROC *myproc = MyProc;
232-
volatile PGXACT *mypgxact = MyPgXact;
233-
234-
if (!isSubXact)
235-
mypgxact->xid = xid;
236-
else
237-
{
238-
int nxids = mypgxact->nxids;
239-
240-
if (nxids < PGPROC_MAX_CACHED_SUBXIDS)
241-
{
242-
myproc->subxids.xids[nxids] = xid;
243-
mypgxact->nxids = nxids + 1;
244-
}
245-
else
246-
mypgxact->overflowed = true;
247-
}
248-
}
249-
250-
LWLockRelease(XidGenLock);
251-
252-
return xid;
25356
}
25457

25558
/*

src/include/access/transam.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ extern TransactionId TransactionIdLatest(TransactionId mainxid,
169169
extern XLogRecPtr TransactionIdGetCommitLSN(TransactionId xid);
170170

171171
/* in transam/varsup.c */
172-
extern TransactionId GetNextTransactionId(void);
172+
extern TransactionId GetNewLocalTransactionId(bool isSubXact);
173173
extern TransactionId GetNewTransactionId(bool isSubXact);
174174
extern TransactionId ReadNewTransactionId(void);
175175
extern void SetTransactionIdLimit(TransactionId oldest_datfrozenxid,

src/include/access/xtm.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ typedef struct
1919
XidStatus (*GetTransactionStatus)(TransactionId xid, XLogRecPtr *lsn);
2020
void (*SetTransactionStatus)(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn);
2121
Snapshot (*GetSnapshot)(Snapshot snapshot);
22-
TransactionId (*GetNextXid)();
22+
TransactionId (*GetNewTransactionId)(bool isSubXact);
2323
} TransactionManager;
2424

2525
extern TransactionManager* TM;

0 commit comments

Comments
 (0)