Skip to content

Commit 491dab8

Browse files
committed
Set prepare_xlogptr to NULL after checkpoint where that gxact was moved to file. Later we can use NULL value in xlogptr as a flag that our data stored in file
1 parent 921dd41 commit 491dab8

File tree

1 file changed

+34
-21
lines changed

1 file changed

+34
-21
lines changed

src/backend/access/transam/twophase.c

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,10 @@ typedef struct GlobalTransactionData
121121
BackendId dummyBackendId; /* similar to backend id for backends */
122122
TimestampTz prepared_at; /* time of preparation */
123123
XLogRecPtr prepare_lsn; /* XLOG offset of prepare record end */
124-
XLogRecPtr prepare_xlogptr; /* XLOG offset of prepare record start */
124+
XLogRecPtr prepare_xlogptr; /* XLOG offset of prepare record start
125+
* or NULL if twophase data moved to file
126+
* after checkpoint.
127+
*/
125128
Oid owner; /* ID of user that executed the xact */
126129
BackendId locking_backend; /* backend currently working on the xact */
127130
bool valid; /* TRUE if PGPROC entry is in proc array */
@@ -1303,21 +1306,23 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
13031306

13041307
/*
13051308
* Read and validate 2PC state data.
1306-
* NB: Here we can face the situation where checkpoint can happend
1307-
* between condition check and xlog read. To prevent that I'm holding
1308-
* delayChkpt. Other possible scenario is try to read xlog and if it fails
1309-
* try to read file.
1309+
* State data can be stored in xlog or files depending on checkpoint
1310+
* status. One way to read that data is to delay checkpoint (delayChkpt) and
1311+
* compare gxact->prepare_lsn with current xlog horizon. But having in mind
1312+
* that most of 2PC transactions will be commited right after prepare, we
1313+
* can just try to read xlog and in case of error read file. Also that is
1314+
* happening under LockGXact, so nobody can commit our transaction between
1315+
* xlog and file reads.
13101316
*/
1311-
MyPgXact->delayChkpt = true;
1312-
if (gxact->prepare_lsn <= GetRedoRecPtr()){
1313-
buf = ReadTwoPhaseFile(xid, true);
1314-
file_used = true;
1317+
if (gxact->prepare_lsn)
1318+
{
1319+
XlogReadTwoPhaseData(gxact->prepare_xlogptr, &buf, NULL);
13151320
}
13161321
else
13171322
{
1318-
XlogReadTwoPhaseData(gxact->prepare_xlogptr, &buf, NULL);
1323+
buf = ReadTwoPhaseFile(xid, true);
1324+
file_used = true;
13191325
}
1320-
MyPgXact->delayChkpt = false;
13211326

13221327
/*
13231328
* Disassemble the header area
@@ -1560,24 +1565,35 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
15601565
int len;
15611566
char *buf;
15621567

1568+
fprintf(stderr, "=== Checkpoint: redo_horizon=%lX\n", redo_horizon);
1569+
15631570
if (max_prepared_xacts <= 0)
15641571
return; /* nothing to do */
15651572

15661573
TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
15671574

1575+
/*
1576+
* Here we doing whole I/O while holding TwoPhaseStateLock.
1577+
* It's also possible to move I/O out of the lock, but on
1578+
* every error we should check whether somebody commited our
1579+
* transaction in different backend. Let's leave this optimisation
1580+
* for future, if somebody will spot that this place cause
1581+
* bottleneck.
1582+
*
1583+
*/
15681584
LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
1569-
15701585
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
15711586
{
15721587
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
15731588
PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
15741589

1575-
if (gxact->valid && gxact->prepare_lsn <= redo_horizon){
1590+
if (gxact->valid && gxact->prepare_lsn && gxact->prepare_lsn <= redo_horizon){
15761591
XlogReadTwoPhaseData(gxact->prepare_xlogptr, &buf, &len);
15771592
RecreateTwoPhaseFile(pgxact->xid, buf, len);
1593+
gxact->prepare_lsn = (XLogRecPtr) NULL;
1594+
pfree(buf);
15781595
}
15791596
}
1580-
15811597
LWLockRelease(TwoPhaseStateLock);
15821598

15831599
TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
@@ -2094,7 +2110,7 @@ RecordTransactionAbortPrepared(TransactionId xid,
20942110

20952111
/**********************************************************************************/
20962112

2097-
void
2113+
static void
20982114
XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
20992115
{
21002116
XLogRecord *record;
@@ -2106,17 +2122,14 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
21062122
elog(ERROR, "failed to open xlogreader for reading 2PC data");
21072123

21082124
record = XLogReadRecord(xlogreader, lsn, &errormsg);
2109-
21102125
if (record == NULL)
2111-
elog(ERROR, "failed to find 2PC data in xlog");
2126+
elog(ERROR, "failed to read 2PC record from xlog");
21122127

21132128
if (len != NULL)
21142129
*len = XLogRecGetDataLen(xlogreader);
2115-
else
2116-
elog(ERROR, "failed to read 2PC data from xlog: xore length");
21172130

2118-
*buf = palloc(sizeof(char)*(*len));
2119-
memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char)*(*len));
2131+
*buf = palloc(sizeof(char)*XLogRecGetDataLen(xlogreader));
2132+
memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char)*XLogRecGetDataLen(xlogreader));
21202133

21212134
XLogReaderFree(xlogreader);
21222135
}

0 commit comments

Comments
 (0)