Skip to content

Commit ed0b409

Browse files
committed
Move "hot" members of PGPROC into a separate PGXACT array.
This speeds up snapshot-taking and reduces ProcArrayLock contention. Also, the PGPROC (and PGXACT) structures used by two-phase commit are now allocated as part of the main array, rather than in a separate array, and we keep ProcArray sorted in pointer order. These changes are intended to minimize the number of cache lines that must be pulled in to take a snapshot, and testing shows a substantial increase in performance on both read and write workloads at high concurrencies. Pavan Deolasee, Heikki Linnakangas, Robert Haas
1 parent 9ed439a commit ed0b409

File tree

14 files changed

+356
-208
lines changed

14 files changed

+356
-208
lines changed

src/backend/access/transam/twophase.c

Lines changed: 69 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,8 @@ int max_prepared_xacts = 0;
113113

114114
typedef struct GlobalTransactionData
115115
{
116-
PGPROC proc; /* dummy proc */
116+
GlobalTransaction next;
117+
int pgprocno; /* dummy proc */
117118
BackendId dummyBackendId; /* similar to backend id for backends */
118119
TimestampTz prepared_at; /* time of preparation */
119120
XLogRecPtr prepare_lsn; /* XLOG offset of prepare record */
@@ -207,7 +208,8 @@ TwoPhaseShmemInit(void)
207208
sizeof(GlobalTransaction) * max_prepared_xacts));
208209
for (i = 0; i < max_prepared_xacts; i++)
209210
{
210-
gxacts[i].proc.links.next = (SHM_QUEUE *) TwoPhaseState->freeGXacts;
211+
gxacts[i].pgprocno = PreparedXactProcs[i].pgprocno;
212+
gxacts[i].next = TwoPhaseState->freeGXacts;
211213
TwoPhaseState->freeGXacts = &gxacts[i];
212214

213215
/*
@@ -243,6 +245,8 @@ MarkAsPreparing(TransactionId xid, const char *gid,
243245
TimestampTz prepared_at, Oid owner, Oid databaseid)
244246
{
245247
GlobalTransaction gxact;
248+
PGPROC *proc;
249+
PGXACT *pgxact;
246250
int i;
247251

248252
if (strlen(gid) >= GIDSIZE)
@@ -274,7 +278,7 @@ MarkAsPreparing(TransactionId xid, const char *gid,
274278
TwoPhaseState->numPrepXacts--;
275279
TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
276280
/* and put it back in the freelist */
277-
gxact->proc.links.next = (SHM_QUEUE *) TwoPhaseState->freeGXacts;
281+
gxact->next = TwoPhaseState->freeGXacts;
278282
TwoPhaseState->freeGXacts = gxact;
279283
/* Back up index count too, so we don't miss scanning one */
280284
i--;
@@ -302,32 +306,36 @@ MarkAsPreparing(TransactionId xid, const char *gid,
302306
errhint("Increase max_prepared_transactions (currently %d).",
303307
max_prepared_xacts)));
304308
gxact = TwoPhaseState->freeGXacts;
305-
TwoPhaseState->freeGXacts = (GlobalTransaction) gxact->proc.links.next;
309+
TwoPhaseState->freeGXacts = (GlobalTransaction) gxact->next;
306310

307-
/* Initialize it */
308-
MemSet(&gxact->proc, 0, sizeof(PGPROC));
309-
SHMQueueElemInit(&(gxact->proc.links));
310-
gxact->proc.waitStatus = STATUS_OK;
311+
proc = &ProcGlobal->allProcs[gxact->pgprocno];
312+
pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
313+
314+
/* Initialize the PGPROC entry */
315+
MemSet(proc, 0, sizeof(PGPROC));
316+
proc->pgprocno = gxact->pgprocno;
317+
SHMQueueElemInit(&(proc->links));
318+
proc->waitStatus = STATUS_OK;
311319
/* We set up the gxact's VXID as InvalidBackendId/XID */
312-
gxact->proc.lxid = (LocalTransactionId) xid;
313-
gxact->proc.xid = xid;
314-
gxact->proc.xmin = InvalidTransactionId;
315-
gxact->proc.pid = 0;
316-
gxact->proc.backendId = InvalidBackendId;
317-
gxact->proc.databaseId = databaseid;
318-
gxact->proc.roleId = owner;
319-
gxact->proc.inCommit = false;
320-
gxact->proc.vacuumFlags = 0;
321-
gxact->proc.lwWaiting = false;
322-
gxact->proc.lwExclusive = false;
323-
gxact->proc.lwWaitLink = NULL;
324-
gxact->proc.waitLock = NULL;
325-
gxact->proc.waitProcLock = NULL;
320+
proc->lxid = (LocalTransactionId) xid;
321+
pgxact->xid = xid;
322+
pgxact->xmin = InvalidTransactionId;
323+
pgxact->inCommit = false;
324+
pgxact->vacuumFlags = 0;
325+
proc->pid = 0;
326+
proc->backendId = InvalidBackendId;
327+
proc->databaseId = databaseid;
328+
proc->roleId = owner;
329+
proc->lwWaiting = false;
330+
proc->lwExclusive = false;
331+
proc->lwWaitLink = NULL;
332+
proc->waitLock = NULL;
333+
proc->waitProcLock = NULL;
326334
for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
327-
SHMQueueInit(&(gxact->proc.myProcLocks[i]));
335+
SHMQueueInit(&(proc->myProcLocks[i]));
328336
/* subxid data must be filled later by GXactLoadSubxactData */
329-
gxact->proc.subxids.overflowed = false;
330-
gxact->proc.subxids.nxids = 0;
337+
pgxact->overflowed = false;
338+
pgxact->nxids = 0;
331339

332340
gxact->prepared_at = prepared_at;
333341
/* initialize LSN to 0 (start of WAL) */
@@ -358,17 +366,19 @@ static void
358366
GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
359367
TransactionId *children)
360368
{
369+
PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
370+
PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
361371
/* We need no extra lock since the GXACT isn't valid yet */
362372
if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
363373
{
364-
gxact->proc.subxids.overflowed = true;
374+
pgxact->overflowed = true;
365375
nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
366376
}
367377
if (nsubxacts > 0)
368378
{
369-
memcpy(gxact->proc.subxids.xids, children,
379+
memcpy(proc->subxids.xids, children,
370380
nsubxacts * sizeof(TransactionId));
371-
gxact->proc.subxids.nxids = nsubxacts;
381+
pgxact->nxids = nsubxacts;
372382
}
373383
}
374384

@@ -389,7 +399,7 @@ MarkAsPrepared(GlobalTransaction gxact)
389399
* Put it into the global ProcArray so TransactionIdIsInProgress considers
390400
* the XID as still running.
391401
*/
392-
ProcArrayAdd(&gxact->proc);
402+
ProcArrayAdd(&ProcGlobal->allProcs[gxact->pgprocno]);
393403
}
394404

395405
/*
@@ -406,6 +416,7 @@ LockGXact(const char *gid, Oid user)
406416
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
407417
{
408418
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
419+
PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
409420

410421
/* Ignore not-yet-valid GIDs */
411422
if (!gxact->valid)
@@ -436,7 +447,7 @@ LockGXact(const char *gid, Oid user)
436447
* there may be some other issues as well. Hence disallow until
437448
* someone gets motivated to make it work.
438449
*/
439-
if (MyDatabaseId != gxact->proc.databaseId)
450+
if (MyDatabaseId != proc->databaseId)
440451
ereport(ERROR,
441452
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
442453
errmsg("prepared transaction belongs to another database"),
@@ -483,7 +494,7 @@ RemoveGXact(GlobalTransaction gxact)
483494
TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
484495

485496
/* and put it back in the freelist */
486-
gxact->proc.links.next = (SHM_QUEUE *) TwoPhaseState->freeGXacts;
497+
gxact->next = TwoPhaseState->freeGXacts;
487498
TwoPhaseState->freeGXacts = gxact;
488499

489500
LWLockRelease(TwoPhaseStateLock);
@@ -518,8 +529,9 @@ TransactionIdIsPrepared(TransactionId xid)
518529
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
519530
{
520531
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
532+
PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
521533

522-
if (gxact->valid && gxact->proc.xid == xid)
534+
if (gxact->valid && pgxact->xid == xid)
523535
{
524536
result = true;
525537
break;
@@ -642,6 +654,8 @@ pg_prepared_xact(PG_FUNCTION_ARGS)
642654
while (status->array != NULL && status->currIdx < status->ngxacts)
643655
{
644656
GlobalTransaction gxact = &status->array[status->currIdx++];
657+
PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
658+
PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
645659
Datum values[5];
646660
bool nulls[5];
647661
HeapTuple tuple;
@@ -656,11 +670,11 @@ pg_prepared_xact(PG_FUNCTION_ARGS)
656670
MemSet(values, 0, sizeof(values));
657671
MemSet(nulls, 0, sizeof(nulls));
658672

659-
values[0] = TransactionIdGetDatum(gxact->proc.xid);
673+
values[0] = TransactionIdGetDatum(pgxact->xid);
660674
values[1] = CStringGetTextDatum(gxact->gid);
661675
values[2] = TimestampTzGetDatum(gxact->prepared_at);
662676
values[3] = ObjectIdGetDatum(gxact->owner);
663-
values[4] = ObjectIdGetDatum(gxact->proc.databaseId);
677+
values[4] = ObjectIdGetDatum(proc->databaseId);
664678

665679
tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
666680
result = HeapTupleGetDatum(tuple);
@@ -711,10 +725,11 @@ TwoPhaseGetDummyProc(TransactionId xid)
711725
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
712726
{
713727
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
728+
PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
714729

715-
if (gxact->proc.xid == xid)
730+
if (pgxact->xid == xid)
716731
{
717-
result = &gxact->proc;
732+
result = &ProcGlobal->allProcs[gxact->pgprocno];
718733
break;
719734
}
720735
}
@@ -841,7 +856,9 @@ save_state_data(const void *data, uint32 len)
841856
void
842857
StartPrepare(GlobalTransaction gxact)
843858
{
844-
TransactionId xid = gxact->proc.xid;
859+
PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
860+
PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
861+
TransactionId xid = pgxact->xid;
845862
TwoPhaseFileHeader hdr;
846863
TransactionId *children;
847864
RelFileNode *commitrels;
@@ -865,7 +882,7 @@ StartPrepare(GlobalTransaction gxact)
865882
hdr.magic = TWOPHASE_MAGIC;
866883
hdr.total_len = 0; /* EndPrepare will fill this in */
867884
hdr.xid = xid;
868-
hdr.database = gxact->proc.databaseId;
885+
hdr.database = proc->databaseId;
869886
hdr.prepared_at = gxact->prepared_at;
870887
hdr.owner = gxact->owner;
871888
hdr.nsubxacts = xactGetCommittedChildren(&children);
@@ -913,7 +930,8 @@ StartPrepare(GlobalTransaction gxact)
913930
void
914931
EndPrepare(GlobalTransaction gxact)
915932
{
916-
TransactionId xid = gxact->proc.xid;
933+
PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
934+
TransactionId xid = pgxact->xid;
917935
TwoPhaseFileHeader *hdr;
918936
char path[MAXPGPATH];
919937
XLogRecData *record;
@@ -1021,7 +1039,7 @@ EndPrepare(GlobalTransaction gxact)
10211039
*/
10221040
START_CRIT_SECTION();
10231041

1024-
MyProc->inCommit = true;
1042+
MyPgXact->inCommit = true;
10251043

10261044
gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE,
10271045
records.head);
@@ -1069,7 +1087,7 @@ EndPrepare(GlobalTransaction gxact)
10691087
* checkpoint starting after this will certainly see the gxact as a
10701088
* candidate for fsyncing.
10711089
*/
1072-
MyProc->inCommit = false;
1090+
MyPgXact->inCommit = false;
10731091

10741092
END_CRIT_SECTION();
10751093

@@ -1242,6 +1260,8 @@ void
12421260
FinishPreparedTransaction(const char *gid, bool isCommit)
12431261
{
12441262
GlobalTransaction gxact;
1263+
PGPROC *proc;
1264+
PGXACT *pgxact;
12451265
TransactionId xid;
12461266
char *buf;
12471267
char *bufptr;
@@ -1260,7 +1280,9 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
12601280
* try to commit the same GID at once.
12611281
*/
12621282
gxact = LockGXact(gid, GetUserId());
1263-
xid = gxact->proc.xid;
1283+
proc = &ProcGlobal->allProcs[gxact->pgprocno];
1284+
pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
1285+
xid = pgxact->xid;
12641286

12651287
/*
12661288
* Read and validate the state file
@@ -1309,7 +1331,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
13091331
hdr->nsubxacts, children,
13101332
hdr->nabortrels, abortrels);
13111333

1312-
ProcArrayRemove(&gxact->proc, latestXid);
1334+
ProcArrayRemove(proc, latestXid);
13131335

13141336
/*
13151337
* In case we fail while running the callbacks, mark the gxact invalid so
@@ -1540,10 +1562,11 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
15401562
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
15411563
{
15421564
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1565+
PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
15431566

15441567
if (gxact->valid &&
15451568
XLByteLE(gxact->prepare_lsn, redo_horizon))
1546-
xids[nxids++] = gxact->proc.xid;
1569+
xids[nxids++] = pgxact->xid;
15471570
}
15481571

15491572
LWLockRelease(TwoPhaseStateLock);
@@ -1972,7 +1995,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
19721995
START_CRIT_SECTION();
19731996

19741997
/* See notes in RecordTransactionCommit */
1975-
MyProc->inCommit = true;
1998+
MyPgXact->inCommit = true;
19761999

19772000
/* Emit the XLOG commit record */
19782001
xlrec.xid = xid;
@@ -2037,7 +2060,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
20372060
TransactionIdCommitTree(xid, nchildren, children);
20382061

20392062
/* Checkpoint can proceed now */
2040-
MyProc->inCommit = false;
2063+
MyPgXact->inCommit = false;
20412064

20422065
END_CRIT_SECTION();
20432066

src/backend/access/transam/varsup.c

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ GetNewTransactionId(bool isSubXact)
5454
if (IsBootstrapProcessingMode())
5555
{
5656
Assert(!isSubXact);
57-
MyProc->xid = BootstrapTransactionId;
57+
MyPgXact->xid = BootstrapTransactionId;
5858
return BootstrapTransactionId;
5959
}
6060

@@ -208,20 +208,21 @@ GetNewTransactionId(bool isSubXact)
208208
* TransactionId and int fetch/store are atomic.
209209
*/
210210
volatile PGPROC *myproc = MyProc;
211+
volatile PGXACT *mypgxact = MyPgXact;
211212

212213
if (!isSubXact)
213-
myproc->xid = xid;
214+
mypgxact->xid = xid;
214215
else
215216
{
216-
int nxids = myproc->subxids.nxids;
217+
int nxids = mypgxact->nxids;
217218

218219
if (nxids < PGPROC_MAX_CACHED_SUBXIDS)
219220
{
220221
myproc->subxids.xids[nxids] = xid;
221-
myproc->subxids.nxids = nxids + 1;
222+
mypgxact->nxids = nxids + 1;
222223
}
223224
else
224-
myproc->subxids.overflowed = true;
225+
mypgxact->overflowed = true;
225226
}
226227
}
227228

src/backend/access/transam/xact.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -981,7 +981,7 @@ RecordTransactionCommit(void)
981981
* bit fuzzy, but it doesn't matter.
982982
*/
983983
START_CRIT_SECTION();
984-
MyProc->inCommit = true;
984+
MyPgXact->inCommit = true;
985985

986986
SetCurrentTransactionStopTimestamp();
987987

@@ -1155,7 +1155,7 @@ RecordTransactionCommit(void)
11551155
*/
11561156
if (markXidCommitted)
11571157
{
1158-
MyProc->inCommit = false;
1158+
MyPgXact->inCommit = false;
11591159
END_CRIT_SECTION();
11601160
}
11611161

src/backend/commands/analyze.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ analyze_rel(Oid relid, VacuumStmt *vacstmt, BufferAccessStrategy bstrategy)
223223
* OK, let's do it. First let other backends know I'm in ANALYZE.
224224
*/
225225
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
226-
MyProc->vacuumFlags |= PROC_IN_ANALYZE;
226+
MyPgXact->vacuumFlags |= PROC_IN_ANALYZE;
227227
LWLockRelease(ProcArrayLock);
228228

229229
/*
@@ -250,7 +250,7 @@ analyze_rel(Oid relid, VacuumStmt *vacstmt, BufferAccessStrategy bstrategy)
250250
* because the vacuum flag is cleared by the end-of-xact code.
251251
*/
252252
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
253-
MyProc->vacuumFlags &= ~PROC_IN_ANALYZE;
253+
MyPgXact->vacuumFlags &= ~PROC_IN_ANALYZE;
254254
LWLockRelease(ProcArrayLock);
255255
}
256256

src/backend/commands/vacuum.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -893,9 +893,9 @@ vacuum_rel(Oid relid, VacuumStmt *vacstmt, bool do_toast, bool for_wraparound)
893893
* which is probably Not Good.
894894
*/
895895
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
896-
MyProc->vacuumFlags |= PROC_IN_VACUUM;
896+
MyPgXact->vacuumFlags |= PROC_IN_VACUUM;
897897
if (for_wraparound)
898-
MyProc->vacuumFlags |= PROC_VACUUM_FOR_WRAPAROUND;
898+
MyPgXact->vacuumFlags |= PROC_VACUUM_FOR_WRAPAROUND;
899899
LWLockRelease(ProcArrayLock);
900900
}
901901

0 commit comments

Comments
 (0)