Skip to content

Commit 51e68f0

Browse files
committed
Fix problem with CLOG out of bounds access
1 parent 473f388 commit 51e68f0

File tree

2 files changed

+409
-3
lines changed

2 files changed

+409
-3
lines changed

contrib/pg_xtm/pg_dtm.c

Lines changed: 214 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,12 @@
3333
#include "utils/array.h"
3434
#include "utils/builtins.h"
3535
#include "utils/memutils.h"
36+
#include "commands/dbcommands.h"
37+
#include "miscadmin.h"
38+
#include "postmaster/autovacuum.h"
39+
#include "storage/pmsignal.h"
40+
#include "storage/proc.h"
41+
#include "utils/syscache.h"
3642

3743
#include "libdtm.h"
3844

@@ -61,6 +67,7 @@ static void DtmUpdateRecentXmin(void);
6167
static void DtmInitialize(void);
6268
static void DtmXactCallback(XactEvent event, void *arg);
6369
static TransactionId DtmGetNextXid(void);
70+
static TransactionId DtmGetNewTransactionId(bool isSubXact);
6471

6572
static bool TransactionIdIsInDtmSnapshot(TransactionId xid);
6673
static bool TransactionIdIsInDoubt(TransactionId xid);
@@ -77,7 +84,7 @@ static SnapshotData DtmSnapshot = { HeapTupleSatisfiesMVCC };
7784
static bool DtmHasGlobalSnapshot;
7885
static bool DtmIsGlobalTransaction;
7986
static int DtmLocalXidReserve;
80-
static TransactionManager DtmTM = { DtmGetTransactionStatus, DtmSetTransactionStatus, DtmGetSnapshot, DtmGetNextXid };
87+
static TransactionManager DtmTM = { DtmGetTransactionStatus, DtmSetTransactionStatus, DtmGetSnapshot, DtmGetNewTransactionId };
8188

8289

8390
#define XTM_TRACE(fmt, ...)
@@ -190,7 +197,7 @@ static void DtmUpdateRecentXmin(void)
190197

191198
if (TransactionIdIsValid(xmin)) {
192199
xmin -= vacuum_defer_cleanup_age;
193-
xmin = FirstNormalTransactionId;
200+
//xmin = FirstNormalTransactionId;
194201
if (!TransactionIdIsNormal(xmin)) {
195202
xmin = FirstNormalTransactionId;
196203
}
@@ -245,6 +252,211 @@ static TransactionId DtmGetNextXid()
245252
return xid;
246253
}
247254

255+
TransactionId
256+
DtmGetNewTransactionId(bool isSubXact)
257+
{
258+
TransactionId xid;
259+
260+
/*
261+
* Workers synchronize transaction state at the beginning of each parallel
262+
* operation, so we can't account for new XIDs after that point.
263+
*/
264+
if (IsInParallelMode())
265+
elog(ERROR, "cannot assign TransactionIds during a parallel operation");
266+
267+
/*
268+
* During bootstrap initialization, we return the special bootstrap
269+
* transaction id.
270+
*/
271+
if (IsBootstrapProcessingMode())
272+
{
273+
Assert(!isSubXact);
274+
MyPgXact->xid = BootstrapTransactionId;
275+
return BootstrapTransactionId;
276+
}
277+
278+
/* safety check, we should never get this far in a HS slave */
279+
if (RecoveryInProgress())
280+
elog(ERROR, "cannot assign TransactionIds during recovery");
281+
282+
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
283+
xid = DtmGetNextXid();
284+
285+
/*----------
286+
* Check to see if it's safe to assign another XID. This protects against
287+
* catastrophic data loss due to XID wraparound. The basic rules are:
288+
*
289+
* If we're past xidVacLimit, start trying to force autovacuum cycles.
290+
* If we're past xidWarnLimit, start issuing warnings.
291+
* If we're past xidStopLimit, refuse to execute transactions, unless
292+
* we are running in single-user mode (which gives an escape hatch
293+
* to the DBA who somehow got past the earlier defenses).
294+
*
295+
* Note that this coding also appears in GetNewMultiXactId.
296+
*----------
297+
*/
298+
if (TransactionIdFollowsOrEquals(xid, ShmemVariableCache->xidVacLimit))
299+
{
300+
/*
301+
* For safety's sake, we release XidGenLock while sending signals,
302+
* warnings, etc. This is not so much because we care about
303+
* preserving concurrency in this situation, as to avoid any
304+
* possibility of deadlock while doing get_database_name(). First,
305+
* copy all the shared values we'll need in this path.
306+
*/
307+
TransactionId xidWarnLimit = ShmemVariableCache->xidWarnLimit;
308+
TransactionId xidStopLimit = ShmemVariableCache->xidStopLimit;
309+
TransactionId xidWrapLimit = ShmemVariableCache->xidWrapLimit;
310+
Oid oldest_datoid = ShmemVariableCache->oldestXidDB;
311+
312+
LWLockRelease(XidGenLock);
313+
314+
/*
315+
* To avoid swamping the postmaster with signals, we issue the autovac
316+
* request only once per 64K transaction starts. This still gives
317+
* plenty of chances before we get into real trouble.
318+
*/
319+
if (IsUnderPostmaster && (xid % 65536) == 0)
320+
SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_LAUNCHER);
321+
322+
if (IsUnderPostmaster &&
323+
TransactionIdFollowsOrEquals(xid, xidStopLimit))
324+
{
325+
char *oldest_datname = get_database_name(oldest_datoid);
326+
327+
/* complain even if that DB has disappeared */
328+
if (oldest_datname)
329+
ereport(ERROR,
330+
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
331+
errmsg("database is not accepting commands to avoid wraparound data loss in database \"%s\"",
332+
oldest_datname),
333+
errhint("Stop the postmaster and vacuum that database in single-user mode.\n"
334+
"You might also need to commit or roll back old prepared transactions.")));
335+
else
336+
ereport(ERROR,
337+
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
338+
errmsg("database is not accepting commands to avoid wraparound data loss in database with OID %u",
339+
oldest_datoid),
340+
errhint("Stop the postmaster and vacuum that database in single-user mode.\n"
341+
"You might also need to commit or roll back old prepared transactions.")));
342+
}
343+
else if (TransactionIdFollowsOrEquals(xid, xidWarnLimit))
344+
{
345+
char *oldest_datname = get_database_name(oldest_datoid);
346+
347+
/* complain even if that DB has disappeared */
348+
if (oldest_datname)
349+
ereport(WARNING,
350+
(errmsg("database \"%s\" must be vacuumed within %u transactions",
351+
oldest_datname,
352+
xidWrapLimit - xid),
353+
errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n"
354+
"You might also need to commit or roll back old prepared transactions.")));
355+
else
356+
ereport(WARNING,
357+
(errmsg("database with OID %u must be vacuumed within %u transactions",
358+
oldest_datoid,
359+
xidWrapLimit - xid),
360+
errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n"
361+
"You might also need to commit or roll back old prepared transactions.")));
362+
}
363+
364+
/* Re-acquire lock and start over */
365+
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
366+
xid = DtmGetNextXid();
367+
}
368+
369+
/*
370+
* If we are allocating the first XID of a new page of the commit log,
371+
* zero out that commit-log page before returning. We must do this while
372+
* holding XidGenLock, else another xact could acquire and commit a later
373+
* XID before we zero the page. Fortunately, a page of the commit log
374+
* holds 32K or more transactions, so we don't have to do this very often.
375+
*
376+
* Extend pg_subtrans and pg_commit_ts too.
377+
*/
378+
if (TransactionIdFollowsOrEquals(xid, ShmemVariableCache->nextXid)) {
379+
fprintf(stderr, "Extend CLOG to %d\n", xid);
380+
ExtendCLOG(xid);
381+
ExtendCommitTs(xid);
382+
ExtendSUBTRANS(xid);
383+
}
384+
/*
385+
* Now advance the nextXid counter. This must not happen until after we
386+
* have successfully completed ExtendCLOG() --- if that routine fails, we
387+
* want the next incoming transaction to try it again. We cannot assign
388+
* more XIDs until there is CLOG space for them.
389+
*/
390+
if (xid == ShmemVariableCache->nextXid) {
391+
TransactionIdAdvance(ShmemVariableCache->nextXid);
392+
} else {
393+
Assert(TransactionIdPrecedes(xid, ShmemVariableCache->nextXid));
394+
}
395+
396+
/*
397+
* We must store the new XID into the shared ProcArray before releasing
398+
* XidGenLock. This ensures that every active XID older than
399+
* latestCompletedXid is present in the ProcArray, which is essential for
400+
* correct OldestXmin tracking; see src/backend/access/transam/README.
401+
*
402+
* XXX by storing xid into MyPgXact without acquiring ProcArrayLock, we
403+
* are relying on fetch/store of an xid to be atomic, else other backends
404+
* might see a partially-set xid here. But holding both locks at once
405+
* would be a nasty concurrency hit. So for now, assume atomicity.
406+
*
407+
* Note that readers of PGXACT xid fields should be careful to fetch the
408+
* value only once, rather than assume they can read a value multiple
409+
* times and get the same answer each time.
410+
*
411+
* The same comments apply to the subxact xid count and overflow fields.
412+
*
413+
* A solution to the atomic-store problem would be to give each PGXACT its
414+
* own spinlock used only for fetching/storing that PGXACT's xid and
415+
* related fields.
416+
*
417+
* If there's no room to fit a subtransaction XID into PGPROC, set the
418+
* cache-overflowed flag instead. This forces readers to look in
419+
* pg_subtrans to map subtransaction XIDs up to top-level XIDs. There is a
420+
* race-condition window, in that the new XID will not appear as running
421+
* until its parent link has been placed into pg_subtrans. However, that
422+
* will happen before anyone could possibly have a reason to inquire about
423+
* the status of the XID, so it seems OK. (Snapshots taken during this
424+
* window *will* include the parent XID, so they will deliver the correct
425+
* answer later on when someone does have a reason to inquire.)
426+
*/
427+
{
428+
/*
429+
* Use volatile pointer to prevent code rearrangement; other backends
430+
* could be examining my subxids info concurrently, and we don't want
431+
* them to see an invalid intermediate state, such as incrementing
432+
* nxids before filling the array entry. Note we are assuming that
433+
* TransactionId and int fetch/store are atomic.
434+
*/
435+
volatile PGPROC *myproc = MyProc;
436+
volatile PGXACT *mypgxact = MyPgXact;
437+
438+
if (!isSubXact)
439+
mypgxact->xid = xid;
440+
else
441+
{
442+
int nxids = mypgxact->nxids;
443+
444+
if (nxids < PGPROC_MAX_CACHED_SUBXIDS)
445+
{
446+
myproc->subxids.xids[nxids] = xid;
447+
mypgxact->nxids = nxids + 1;
448+
}
449+
else
450+
mypgxact->overflowed = true;
451+
}
452+
}
453+
454+
LWLockRelease(XidGenLock);
455+
456+
return xid;
457+
}
458+
459+
248460
static Snapshot DtmGetSnapshot(Snapshot snapshot)
249461
{
250462

0 commit comments

Comments
 (0)