Skip to content

Commit 921dd41

Browse files
committed
be more accurate about locks and pfree's
1 parent 46c55e7 commit 921dd41

File tree

1 file changed

+29
-102
lines changed

1 file changed

+29
-102
lines changed

src/backend/access/transam/twophase.c

Lines changed: 29 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -990,8 +990,6 @@ StartPrepare(GlobalTransaction gxact)
990990

991991
save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
992992

993-
// fprintf(stderr, "StartPrepare: %s=(%d,%d,%d,%d)\n", hdr.gid, hdr.nsubxacts, hdr.ncommitrels, hdr.nabortrels, hdr.ninvalmsgs);
994-
995993
/*
996994
* Add the additional info about subxacts, deletable files and cache
997995
* invalidation messages.
@@ -1108,11 +1106,6 @@ EndPrepare(GlobalTransaction gxact)
11081106

11091107
END_CRIT_SECTION();
11101108

1111-
1112-
fprintf(stderr, "EndPrepare: %s=(%d,%d,%d,%d,%d)\n", gxact->gid, hdr->xid, hdr->nsubxacts, hdr->ncommitrels, hdr->nabortrels, hdr->ninvalmsgs);
1113-
fprintf(stderr, "EndPrepare: %s={xlogptr:%lX,lsn:%lX,delta:%lX}\n", gxact->gid, gxact->prepare_xlogptr, gxact->prepare_lsn, gxact->prepare_lsn - gxact->prepare_xlogptr);
1114-
1115-
11161109
/*
11171110
* Wait for synchronous replication, if required.
11181111
*
@@ -1169,7 +1162,7 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
11691162
if (give_warnings)
11701163
ereport(WARNING,
11711164
(errcode_for_file_access(),
1172-
errmsg("ReadTwoPhaseFile: could not open two-phase state file \"%s\": %m",
1165+
errmsg("could not open two-phase state file \"%s\": %m",
11731166
path)));
11741167
return NULL;
11751168
}
@@ -1297,6 +1290,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
12971290
int ndelrels;
12981291
SharedInvalidationMessage *invalmsgs;
12991292
int i;
1293+
bool file_used = false;
13001294

13011295
/*
13021296
* Validate the GID, and lock the GXACT to ensure that two backends do not
@@ -1308,12 +1302,22 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
13081302
xid = pgxact->xid;
13091303

13101304
/*
1311-
* Read and validate the state file
1305+
* Read and validate 2PC state data.
1306+
* NB: Here we can face the situation where checkpoint can happend
1307+
* between condition check and xlog read. To prevent that I'm holding
1308+
* delayChkpt. Other possible scenario is try to read xlog and if it fails
1309+
* try to read file.
13121310
*/
1313-
if (gxact->prepare_lsn <= GetRedoRecPtr())
1311+
MyPgXact->delayChkpt = true;
1312+
if (gxact->prepare_lsn <= GetRedoRecPtr()){
13141313
buf = ReadTwoPhaseFile(xid, true);
1314+
file_used = true;
1315+
}
13151316
else
1317+
{
13161318
XlogReadTwoPhaseData(gxact->prepare_xlogptr, &buf, NULL);
1319+
}
1320+
MyPgXact->delayChkpt = false;
13171321

13181322
/*
13191323
* Disassemble the header area
@@ -1333,14 +1337,6 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
13331337
/* compute latestXid among all children */
13341338
latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
13351339

1336-
fprintf(stderr, "FinishPrepared: %s=(%d,%d,%d,%d,%d)\n", gxact->gid, hdr->xid, hdr->nsubxacts, hdr->ncommitrels, hdr->nabortrels, hdr->ninvalmsgs);
1337-
fprintf(stderr, "FinishPrepared: %s={xlogptr:%lX,lsn:%lX,delta:%lX}\n", gxact->gid, gxact->prepare_xlogptr, gxact->prepare_lsn, gxact->prepare_lsn - gxact->prepare_xlogptr);
1338-
1339-
Assert(hdr->nsubxacts == 0);
1340-
Assert(hdr->ncommitrels == 0);
1341-
Assert(hdr->nabortrels == 0);
1342-
Assert(hdr->ninvalmsgs == 0);
1343-
13441340
/*
13451341
* The order of operations here is critical: make the XLOG entry for
13461342
* commit or abort, then mark the transaction committed or aborted in
@@ -1423,12 +1419,13 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
14231419
/*
14241420
* And now we can clean up our mess.
14251421
*/
1426-
// RemoveTwoPhaseFile(xid, true);
1422+
if (file_used)
1423+
RemoveTwoPhaseFile(xid, true);
14271424

14281425
RemoveGXact(gxact);
14291426
MyLockedGxact = NULL;
14301427

1431-
// pfree(buf);
1428+
pfree(buf);
14321429
}
14331430

14341431
/*
@@ -1489,8 +1486,6 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
14891486
pg_crc32c statefile_crc;
14901487
int fd;
14911488

1492-
fprintf(stderr, "RecreateTwoPhaseFile called xid=%d, len=%d\n", xid, len);
1493-
14941489
/* Recompute CRC */
14951490
INIT_CRC32C(statefile_crc);
14961491
COMP_CRC32C(statefile_crc, content, len);
@@ -1561,99 +1556,30 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
15611556
void
15621557
CheckPointTwoPhase(XLogRecPtr redo_horizon)
15631558
{
1564-
TransactionId *xids;
1565-
XLogRecPtr *xlogptrs;
1566-
int nxids;
1567-
char path[MAXPGPATH];
15681559
int i;
1560+
int len;
1561+
char *buf;
15691562

1570-
/*
1571-
* We don't want to hold the TwoPhaseStateLock while doing I/O, so we grab
1572-
* it just long enough to make a list of the XIDs that require fsyncing,
1573-
* and then do the I/O afterwards.
1574-
*
1575-
* This approach creates a race condition: someone else could delete a
1576-
* GXACT between the time we release TwoPhaseStateLock and the time we try
1577-
* to open its state file. We handle this by special-casing ENOENT
1578-
* failures: if we see that, we verify that the GXACT is no longer valid,
1579-
* and if so ignore the failure.
1580-
*/
15811563
if (max_prepared_xacts <= 0)
15821564
return; /* nothing to do */
15831565

15841566
TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
15851567

1586-
xids = (TransactionId *) palloc(max_prepared_xacts * sizeof(TransactionId));
1587-
xlogptrs = (XLogRecPtr *) palloc(max_prepared_xacts * sizeof(XLogRecPtr));
1588-
nxids = 0;
1589-
15901568
LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
15911569

15921570
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
15931571
{
15941572
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
15951573
PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
1596-
int j;
15971574

15981575
if (gxact->valid && gxact->prepare_lsn <= redo_horizon){
1599-
j = nxids++;
1600-
xids[j] = pgxact->xid;
1601-
xlogptrs[j] = gxact->prepare_xlogptr;
1576+
XlogReadTwoPhaseData(gxact->prepare_xlogptr, &buf, &len);
1577+
RecreateTwoPhaseFile(pgxact->xid, buf, len);
16021578
}
1603-
16041579
}
16051580

16061581
LWLockRelease(TwoPhaseStateLock);
16071582

1608-
for (i = 0; i < nxids; i++)
1609-
{
1610-
TransactionId xid = xids[i];
1611-
int fd;
1612-
int len;
1613-
char *buf;
1614-
1615-
TwoPhaseFilePath(path, xid);
1616-
1617-
fprintf(stderr, "CheckPointTwoPhase: %lX\n", xlogptrs[i]);
1618-
1619-
fd = OpenTransientFile(path, O_RDWR | PG_BINARY, 0);
1620-
1621-
if (fd < 0 && errno == ENOENT)
1622-
{
1623-
fprintf(stderr, "CheckPointTwoPhase: %d <-> %d \n", errno, ENOENT);
1624-
1625-
/* OK if gxact is no longer valid */
1626-
if (!TransactionIdIsPrepared(xid))
1627-
continue;
1628-
1629-
/* Re-create file */
1630-
XlogReadTwoPhaseData(xlogptrs[i], &buf, &len);
1631-
RecreateTwoPhaseFile(xid, buf, len);
1632-
fd = OpenTransientFile(path, O_RDWR | PG_BINARY, 0);
1633-
1634-
if (fd < 0)
1635-
ereport(ERROR,
1636-
(errcode_for_file_access(),
1637-
errmsg("CheckPointTwoPhase: could not open two-phase state file after re-creating \"%s\": %m",
1638-
path)));
1639-
}
1640-
else if (fd < 0)
1641-
{
1642-
ereport(ERROR,
1643-
(errcode_for_file_access(),
1644-
errmsg("CheckPointTwoPhase: could not open two-phase state file \"%s\": %m",
1645-
path)));
1646-
}
1647-
1648-
if (CloseTransientFile(fd) != 0)
1649-
ereport(ERROR,
1650-
(errcode_for_file_access(),
1651-
errmsg("could not close two-phase state file \"%s\": %m",
1652-
path)));
1653-
}
1654-
1655-
pfree(xids);
1656-
16571583
TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
16581584
}
16591585

@@ -2177,19 +2103,20 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
21772103

21782104
xlogreader = XLogReaderAllocate(&logical_read_local_xlog_page, NULL);
21792105
if (xlogreader == NULL)
2180-
fprintf(stderr, "xlogreader == NULL\n");
2106+
elog(ERROR, "failed to open xlogreader for reading 2PC data");
21812107

21822108
record = XLogReadRecord(xlogreader, lsn, &errormsg);
21832109

21842110
if (record == NULL)
2185-
fprintf(stderr, "XLogReadRecord error\n");
2111+
elog(ERROR, "failed to find 2PC data in xlog");
21862112

21872113
if (len != NULL)
21882114
*len = XLogRecGetDataLen(xlogreader);
2189-
*buf = XLogRecGetData(xlogreader);
2190-
}
2191-
2192-
2193-
2115+
else
2116+
elog(ERROR, "failed to read 2PC data from xlog: xore length");
21942117

2118+
*buf = palloc(sizeof(char)*(*len));
2119+
memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char)*(*len));
21952120

2121+
XLogReaderFree(xlogreader);
2122+
}

0 commit comments

Comments
 (0)