Skip to content

Commit d37e2cd

Browse files
committed
finally working xlog insert/read
1 parent 8d791a5 commit d37e2cd

File tree

3 files changed

+97
-142
lines changed

3 files changed

+97
-142
lines changed

src/backend/access/transam/twophase.c

Lines changed: 95 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -174,10 +174,8 @@ static void RemoveGXact(GlobalTransaction gxact);
174174

175175
static char twophase_buf[10*1024];
176176
static int twophase_pos = 0;
177-
size_t bogus_write(int fd, char *buf, size_t nbytes);
178-
177+
size_t bogus_write(int fd, const void *buf, size_t nbytes);
179178
static char *XlogReadTwoPhaseData(XLogRecPtr lsn);
180-
// LWLock *xlogreclock;
181179

182180
/*
183181
* Initialization of shared memory
@@ -997,6 +995,8 @@ StartPrepare(GlobalTransaction gxact)
997995

998996
save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
999997

998+
// fprintf(stderr, "StartPrepare: %s=(%d,%d,%d,%d)\n", hdr.gid, hdr.nsubxacts, hdr.ncommitrels, hdr.nabortrels, hdr.ninvalmsgs);
999+
10001000
/*
10011001
* Add the additional info about subxacts, deletable files and cache
10021002
* invalidation messages.
@@ -1033,13 +1033,13 @@ StartPrepare(GlobalTransaction gxact)
10331033
void
10341034
EndPrepare(GlobalTransaction gxact)
10351035
{
1036-
PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
1037-
TransactionId xid = pgxact->xid;
1036+
// PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
1037+
// TransactionId xid = pgxact->xid;
10381038
TwoPhaseFileHeader *hdr;
10391039
char path[MAXPGPATH];
10401040
StateFileChunk *record;
10411041
pg_crc32c statefile_crc;
1042-
pg_crc32c bogus_crc;
1042+
// pg_crc32c bogus_crc;
10431043
int fd;
10441044

10451045
/* Add the end sentinel to the list of 2PC records */
@@ -1144,26 +1144,13 @@ EndPrepare(GlobalTransaction gxact)
11441144
MyPgXact->delayChkpt = true;
11451145

11461146
XLogBeginInsert();
1147-
11481147
for (record = records.head; record != NULL; record = record->next)
11491148
XLogRegisterData(record->data, record->len);
1150-
1151-
// LWLockAcquire(xlogreclock, LW_EXCLUSIVE);
1152-
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1153-
gxact->prepare_xlogptr = GetXLogInsertRecPtr();
11541149
gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
1155-
LWLockRelease(TwoPhaseStateLock);
1156-
// LWLockRelease(xlogreclock);
1157-
1158-
11591150
XLogFlush(gxact->prepare_lsn);
1151+
gxact->prepare_xlogptr = ProcLastRecPtr;
11601152

1161-
1162-
// fprintf(stderr, "WAL %s->prepare_xlogptr = %X/%X \n",
1163-
// gxact->gid, (uint32) (gxact->prepare_xlogptr >> 32), (uint32) (gxact->prepare_xlogptr));
1164-
// fprintf(stderr, "WAL %s->prepare_lsn = %X/%X \n",
1165-
// gxact->gid, (uint32) (gxact->prepare_lsn >> 32), (uint32) (gxact->prepare_lsn));
1166-
1153+
// fprintf(stderr, "EndPrepare: %s={xlogptr:%X,lsn:%X, delta: %X}\n", gxact->gid, gxact->prepare_xlogptr, gxact->prepare_lsn, gxact->prepare_lsn - gxact->prepare_xlogptr);
11671154

11681155
/* If we crash now, we have prepared: WAL replay will fix things */
11691156

@@ -1250,101 +1237,100 @@ RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
12501237
static char *
12511238
ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
12521239
{
1253-
// char path[MAXPGPATH];
1254-
// char *buf;
1255-
// TwoPhaseFileHeader *hdr;
1256-
// int fd;
1257-
// struct stat stat;
1258-
// uint32 crc_offset;
1259-
// pg_crc32c calc_crc,
1260-
// file_crc;
1261-
1262-
// TwoPhaseFilePath(path, xid);
1240+
char path[MAXPGPATH];
1241+
char *buf;
1242+
TwoPhaseFileHeader *hdr;
1243+
int fd;
1244+
struct stat stat;
1245+
uint32 crc_offset;
1246+
pg_crc32c calc_crc,
1247+
file_crc;
12631248

1264-
// fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
1249+
TwoPhaseFilePath(path, xid);
12651250

1266-
// if (fd < 0)
1267-
// {
1268-
// if (give_warnings)
1269-
// ereport(WARNING,
1270-
// (errcode_for_file_access(),
1271-
// errmsg("could not open two-phase state file \"%s\": %m",
1272-
// path)));
1273-
// return NULL;
1274-
// }
1251+
fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
1252+
if (fd < 0)
1253+
{
1254+
if (give_warnings)
1255+
ereport(WARNING,
1256+
(errcode_for_file_access(),
1257+
errmsg("could not open two-phase state file \"%s\": %m",
1258+
path)));
1259+
return NULL;
1260+
}
12751261

12761262
/*
12771263
* Check file length. We can determine a lower bound pretty easily. We
12781264
* set an upper bound to avoid palloc() failure on a corrupt file, though
12791265
* we can't guarantee that we won't get an out of memory error anyway,
12801266
* even on a valid file.
12811267
*/
1282-
// if (fstat(fd, &stat))
1283-
// {
1284-
// CloseTransientFile(fd);
1285-
// if (give_warnings)
1286-
// ereport(WARNING,
1287-
// (errcode_for_file_access(),
1288-
// errmsg("could not stat two-phase state file \"%s\": %m",
1289-
// path)));
1290-
// return NULL;
1291-
// }
1268+
if (fstat(fd, &stat))
1269+
{
1270+
CloseTransientFile(fd);
1271+
if (give_warnings)
1272+
ereport(WARNING,
1273+
(errcode_for_file_access(),
1274+
errmsg("could not stat two-phase state file \"%s\": %m",
1275+
path)));
1276+
return NULL;
1277+
}
12921278

1293-
// if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
1294-
// MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
1295-
// sizeof(pg_crc32c)) ||
1296-
// stat.st_size > MaxAllocSize)
1297-
// {
1298-
// CloseTransientFile(fd);
1299-
// return NULL;
1300-
// }
1279+
if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
1280+
MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
1281+
sizeof(pg_crc32c)) ||
1282+
stat.st_size > MaxAllocSize)
1283+
{
1284+
CloseTransientFile(fd);
1285+
return NULL;
1286+
}
13011287

1302-
// crc_offset = stat.st_size - sizeof(pg_crc32c);
1303-
// if (crc_offset != MAXALIGN(crc_offset))
1304-
// {
1305-
// CloseTransientFile(fd);
1306-
// return NULL;
1307-
// }
1288+
crc_offset = stat.st_size - sizeof(pg_crc32c);
1289+
if (crc_offset != MAXALIGN(crc_offset))
1290+
{
1291+
CloseTransientFile(fd);
1292+
return NULL;
1293+
}
13081294

1309-
// /*
1310-
// * OK, slurp in the file.
1311-
// */
1312-
// buf = (char *) palloc(stat.st_size);
1295+
/*
1296+
* OK, slurp in the file.
1297+
*/
1298+
buf = (char *) palloc(stat.st_size);
13131299

1314-
// if (read(fd, buf, stat.st_size) != stat.st_size)
1315-
// {
1316-
// CloseTransientFile(fd);
1317-
// if (give_warnings)
1318-
// ereport(WARNING,
1319-
// (errcode_for_file_access(),
1320-
// errmsg("could not read two-phase state file \"%s\": %m",
1321-
// path)));
1322-
// pfree(buf);
1323-
// return NULL;
1324-
// }
1300+
if (read(fd, buf, stat.st_size) != stat.st_size)
1301+
{
1302+
CloseTransientFile(fd);
1303+
if (give_warnings)
1304+
ereport(WARNING,
1305+
(errcode_for_file_access(),
1306+
errmsg("could not read two-phase state file \"%s\": %m",
1307+
path)));
1308+
pfree(buf);
1309+
return NULL;
1310+
}
13251311

1326-
// CloseTransientFile(fd);
1312+
CloseTransientFile(fd);
13271313

1328-
// hdr = (TwoPhaseFileHeader *) buf;
1329-
// if (hdr->magic != TWOPHASE_MAGIC || hdr->total_len != stat.st_size)
1330-
// {
1331-
// pfree(buf);
1332-
// return NULL;
1333-
// }
1314+
hdr = (TwoPhaseFileHeader *) buf;
1315+
if (hdr->magic != TWOPHASE_MAGIC || hdr->total_len != stat.st_size)
1316+
{
1317+
pfree(buf);
1318+
return NULL;
1319+
}
13341320

1335-
// INIT_CRC32C(calc_crc);
1336-
// COMP_CRC32C(calc_crc, buf, crc_offset);
1337-
// FIN_CRC32C(calc_crc);
1321+
INIT_CRC32C(calc_crc);
1322+
COMP_CRC32C(calc_crc, buf, crc_offset);
1323+
FIN_CRC32C(calc_crc);
13381324

1339-
// file_crc = *((pg_crc32c *) (buf + crc_offset));
1325+
file_crc = *((pg_crc32c *) (buf + crc_offset));
13401326

1341-
// if (!EQ_CRC32C(calc_crc, file_crc))
1342-
// {
1343-
// pfree(buf);
1344-
// return NULL;
1345-
// }
1327+
if (!EQ_CRC32C(calc_crc, file_crc))
1328+
{
1329+
pfree(buf);
1330+
return NULL;
1331+
}
13461332

1347-
return twophase_buf;
1333+
return buf;
13481334
}
13491335

13501336
/*
@@ -1410,12 +1396,8 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
14101396
* Read and validate the state file
14111397
*/
14121398
// buf = ReadTwoPhaseFile(xid, true);
1399+
// buf = twophase_buf;
14131400
buf = XlogReadTwoPhaseData(gxact->prepare_xlogptr);
1414-
if (buf == NULL)
1415-
ereport(ERROR,
1416-
(errcode(ERRCODE_DATA_CORRUPTED),
1417-
errmsg("two-phase state file for transaction %u is corrupt",
1418-
xid)));
14191401

14201402
/*
14211403
* Disassemble the header area
@@ -1435,6 +1417,15 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
14351417
/* compute latestXid among all children */
14361418
latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
14371419

1420+
1421+
// fprintf(stderr, "FinishPrepared: %s=(%d,%d,%d,%d)\n", gxact->gid, hdr->nsubxacts, hdr->ncommitrels, hdr->nabortrels, hdr->ninvalmsgs);
1422+
// fprintf(stderr, "FinishPrepared: %s={xlogptr:%X,lsn:%X,delta:%X}\n", gxact->gid, gxact->prepare_xlogptr, gxact->prepare_lsn, gxact->prepare_lsn - gxact->prepare_xlogptr);
1423+
1424+
Assert(hdr->nsubxacts == 0);
1425+
Assert(hdr->ncommitrels == 0);
1426+
Assert(hdr->nabortrels == 0);
1427+
Assert(hdr->ninvalmsgs == 0);
1428+
14381429
/*
14391430
* The order of operations here is critical: make the XLOG entry for
14401431
* commit or abort, then mark the transaction committed or aborted in
@@ -2246,30 +2237,11 @@ RecordTransactionAbortPrepared(TransactionId xid,
22462237
SyncRepWaitForLSN(recptr);
22472238
}
22482239

2249-
2250-
2251-
2252-
2253-
2254-
2255-
2256-
22572240
/**********************************************************************************/
22582241

22592242

2260-
// static int xlogreadfd = -1;
2261-
// static XLogSegNo xlogreadsegno = -1;
2262-
// static char xlogfpath[MAXPGPATH];
2263-
2264-
// typedef struct XLogPageReadPrivate
2265-
// {
2266-
// const char *datadir;
2267-
// TimeLineID tli;
2268-
// } XLogPageReadPrivate;
2269-
2270-
2271-
size_t
2272-
bogus_write(int fd, char *buf, size_t nbytes)
2243+
size_t
2244+
bogus_write(int fd, const void *buf, size_t nbytes)
22732245
{
22742246
memcpy(twophase_buf + twophase_pos, buf, nbytes);
22752247
twophase_pos += nbytes;
@@ -2284,8 +2256,6 @@ XlogReadTwoPhaseData(XLogRecPtr lsn)
22842256
XLogReaderState *xlogreader;
22852257
char *errormsg;
22862258

2287-
fprintf(stderr, "XlogReadTwoPhaseData called\n");
2288-
22892259
xlogreader = XLogReaderAllocate(&logical_read_local_xlog_page, NULL);
22902260
if (xlogreader == NULL)
22912261
fprintf(stderr, "xlogreader == NULL\n");
@@ -2296,20 +2266,5 @@ XlogReadTwoPhaseData(XLogRecPtr lsn)
22962266
fprintf(stderr, "XLogReadRecord error\n");
22972267
}
22982268

2299-
// memcpy(twophase_buf + twophase_pos, buf, nbytes);
2300-
// twophase_pos += nbytes;
2301-
// return nbytes;
2302-
2303-
// XLogReaderFree(xlogreader);
2304-
// if (xlogreadfd != -1)
2305-
// {
2306-
// close(xlogreadfd);
2307-
// xlogreadfd = -1;
2308-
// }
2309-
23102269
return XLogRecGetData(xlogreader);
23112270
}
2312-
2313-
2314-
2315-

src/backend/access/transam/xlog.c

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -321,8 +321,7 @@ static TimeLineID curFileTLI;
321321
* stored here. The parallel leader advances its own copy, when necessary,
322322
* in WaitForParallelWorkersToFinish.
323323
*/
324-
static XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr;
325-
324+
XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr;
326325
XLogRecPtr XactLastRecEnd = InvalidXLogRecPtr;
327326
XLogRecPtr XactLastCommitEnd = InvalidXLogRecPtr;
328327

src/include/access/xlog.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ typedef enum
8686
RECOVERY_TARGET_IMMEDIATE
8787
} RecoveryTargetType;
8888

89+
extern XLogRecPtr ProcLastRecPtr;
8990
extern XLogRecPtr XactLastRecEnd;
9091
extern PGDLLIMPORT XLogRecPtr XactLastCommitEnd;
9192

0 commit comments

Comments
 (0)