Skip to content

Commit 8d791a5

Browse files
committed
read 2pc data from WAL
1 parent 2413fd4 commit 8d791a5

File tree

1 file changed

+40
-159
lines changed

1 file changed

+40
-159
lines changed

src/backend/access/transam/twophase.c

Lines changed: 40 additions & 159 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
#include "replication/origin.h"
6363
#include "replication/syncrep.h"
6464
#include "replication/walsender.h"
65+
#include "replication/logicalfuncs.h"
6566
#include "storage/fd.h"
6667
#include "storage/ipc.h"
6768
#include "storage/predicate.h"
@@ -174,6 +175,8 @@ static void RemoveGXact(GlobalTransaction gxact);
174175
static char twophase_buf[10*1024];
175176
static int twophase_pos = 0;
176177
size_t bogus_write(int fd, char *buf, size_t nbytes);
178+
179+
static char *XlogReadTwoPhaseData(XLogRecPtr lsn);
177180
// LWLock *xlogreclock;
178181

179182
/*
@@ -1156,8 +1159,10 @@ EndPrepare(GlobalTransaction gxact)
11561159
XLogFlush(gxact->prepare_lsn);
11571160

11581161

1159-
fprintf(stderr, "WAL %s->prepare_xlogptr = %X/%X \n", gxact->gid, (uint32) (gxact->prepare_xlogptr >> 32), (uint32) (gxact->prepare_xlogptr));
1160-
fprintf(stderr, "WAL %s->prepare_lsn = %X/%X \n", gxact->gid, (uint32) (gxact->prepare_lsn >> 32), (uint32) (gxact->prepare_lsn));
1162+
// fprintf(stderr, "WAL %s->prepare_xlogptr = %X/%X \n",
1163+
// gxact->gid, (uint32) (gxact->prepare_xlogptr >> 32), (uint32) (gxact->prepare_xlogptr));
1164+
// fprintf(stderr, "WAL %s->prepare_lsn = %X/%X \n",
1165+
// gxact->gid, (uint32) (gxact->prepare_lsn >> 32), (uint32) (gxact->prepare_lsn));
11611166

11621167

11631168
/* If we crash now, we have prepared: WAL replay will fix things */
@@ -1404,7 +1409,8 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
14041409
/*
14051410
* Read and validate the state file
14061411
*/
1407-
buf = ReadTwoPhaseFile(xid, true);
1412+
// buf = ReadTwoPhaseFile(xid, true);
1413+
buf = XlogReadTwoPhaseData(gxact->prepare_xlogptr);
14081414
if (buf == NULL)
14091415
ereport(ERROR,
14101416
(errcode(ERRCODE_DATA_CORRUPTED),
@@ -2251,15 +2257,16 @@ RecordTransactionAbortPrepared(TransactionId xid,
22512257
/**********************************************************************************/
22522258

22532259

2254-
static int xlogreadfd = -1;
2255-
static XLogSegNo xlogreadsegno = -1;
2256-
static char xlogfpath[MAXPGPATH];
2260+
// static int xlogreadfd = -1;
2261+
// static XLogSegNo xlogreadsegno = -1;
2262+
// static char xlogfpath[MAXPGPATH];
2263+
2264+
// typedef struct XLogPageReadPrivate
2265+
// {
2266+
// const char *datadir;
2267+
// TimeLineID tli;
2268+
// } XLogPageReadPrivate;
22572269

2258-
typedef struct XLogPageReadPrivate
2259-
{
2260-
const char *datadir;
2261-
TimeLineID tli;
2262-
} XLogPageReadPrivate;
22632270

22642271
size_t
22652272
bogus_write(int fd, char *buf, size_t nbytes)
@@ -2270,165 +2277,39 @@ bogus_write(int fd, char *buf, size_t nbytes)
22702277
}
22712278

22722279

2273-
static int SimpleXLogPageRead(XLogReaderState *xlogreader,
2274-
XLogRecPtr targetPagePtr,
2275-
int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
2276-
TimeLineID *pageTLI);
2277-
2278-
2279-
/* XLogreader callback function, to read a WAL page */
2280-
static int
2281-
SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
2282-
int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
2283-
TimeLineID *pageTLI)
2280+
static char *
2281+
XlogReadTwoPhaseData(XLogRecPtr lsn)
22842282
{
2285-
XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data;
2286-
uint32 targetPageOff;
2287-
XLogSegNo targetSegNo PG_USED_FOR_ASSERTS_ONLY;
2283+
XLogRecord *record;
2284+
XLogReaderState *xlogreader;
2285+
char *errormsg;
22882286

2289-
XLByteToSeg(targetPagePtr, targetSegNo);
2290-
targetPageOff = targetPagePtr % XLogSegSize;
2287+
fprintf(stderr, "XlogReadTwoPhaseData called\n");
22912288

2292-
/*
2293-
* See if we need to switch to a new segment because the requested record
2294-
* is not in the currently open one.
2295-
*/
2296-
if (xlogreadfd >= 0 && !XLByteInSeg(targetPagePtr, xlogreadsegno))
2297-
{
2298-
close(xlogreadfd);
2299-
xlogreadfd = -1;
2300-
}
2289+
xlogreader = XLogReaderAllocate(&logical_read_local_xlog_page, NULL);
2290+
if (xlogreader == NULL)
2291+
fprintf(stderr, "xlogreader == NULL\n");
23012292

2302-
XLByteToSeg(targetPagePtr, xlogreadsegno);
2303-
2304-
if (xlogreadfd < 0)
2293+
record = XLogReadRecord(xlogreader, lsn, &errormsg);
2294+
if (record == NULL)
23052295
{
2306-
char xlogfname[MAXFNAMELEN];
2307-
2308-
XLogFileName(xlogfname, private->tli, xlogreadsegno);
2309-
2310-
snprintf(xlogfpath, MAXPGPATH, "%s/" XLOGDIR "/%s", private->datadir, xlogfname);
2311-
2312-
xlogreadfd = open(xlogfpath, O_RDONLY | PG_BINARY, 0);
2313-
2314-
if (xlogreadfd < 0)
2315-
{
2316-
printf(_("could not open file \"%s\": %s\n"), xlogfpath,
2317-
strerror(errno));
2318-
return -1;
2319-
}
2296+
fprintf(stderr, "XLogReadRecord error\n");
23202297
}
23212298

2322-
/*
2323-
* At this point, we have the right segment open.
2324-
*/
2325-
Assert(xlogreadfd != -1);
2299+
// memcpy(twophase_buf + twophase_pos, buf, nbytes);
2300+
// twophase_pos += nbytes;
2301+
// return nbytes;
23262302

2327-
/* Read the requested page */
2328-
if (lseek(xlogreadfd, (off_t) targetPageOff, SEEK_SET) < 0)
2329-
{
2330-
printf(_("could not seek in file \"%s\": %s\n"), xlogfpath,
2331-
strerror(errno));
2332-
return -1;
2333-
}
2334-
2335-
if (read(xlogreadfd, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
2336-
{
2337-
printf(_("could not read from file \"%s\": %s\n"), xlogfpath,
2338-
strerror(errno));
2339-
return -1;
2340-
}
2341-
2342-
Assert(targetSegNo == xlogreadsegno);
2303+
// XLogReaderFree(xlogreader);
2304+
// if (xlogreadfd != -1)
2305+
// {
2306+
// close(xlogreadfd);
2307+
// xlogreadfd = -1;
2308+
// }
23432309

2344-
*pageTLI = private->tli;
2345-
return XLOG_BLCKSZ;
2310+
return XLogRecGetData(xlogreader);
23462311
}
23472312

2348-
// XLogRecPtr
2349-
// readOneRecord(const char *datadir, XLogRecPtr ptr, TimeLineID tli);
2350-
2351-
// XLogRecPtr
2352-
// readOneRecord(const char *datadir, XLogRecPtr ptr, TimeLineID tli)
2353-
// {
2354-
// XLogRecord *record;
2355-
// XLogReaderState *xlogreader;
2356-
// char *errormsg;
2357-
// XLogPageReadPrivate private;
2358-
// XLogRecPtr endptr;
2359-
2360-
// private.datadir = datadir;
2361-
// private.tli = tli;
2362-
// xlogreader = XLogReaderAllocate(&SimpleXLogPageRead, &private);
2363-
// if (xlogreader == NULL)
2364-
// pg_fatal("out of memory\n");
2365-
2366-
// record = XLogReadRecord(xlogreader, ptr, &errormsg);
2367-
// if (record == NULL)
2368-
// {
2369-
// if (errormsg)
2370-
// pg_fatal("could not read WAL record at %X/%X: %s\n",
2371-
// (uint32) (ptr >> 32), (uint32) (ptr), errormsg);
2372-
// else
2373-
// pg_fatal("could not read WAL record at %X/%X\n",
2374-
// (uint32) (ptr >> 32), (uint32) (ptr));
2375-
// }
2376-
// endptr = xlogreader->EndRecPtr;
2377-
2378-
// XLogReaderFree(xlogreader);
2379-
// if (xlogreadfd != -1)
2380-
// {
2381-
// close(xlogreadfd);
2382-
// xlogreadfd = -1;
2383-
// }
2384-
2385-
// return endptr;
2386-
// }
2387-
2388-
2389-
// static char *
2390-
// XlogReadTwoPhaseData(XLogRecPtr lsn, bool give_warnings, TimeLineID tli)
2391-
// {
2392-
// XLogRecord *record;
2393-
// XLogReaderState *xlogreader;
2394-
// XLogPageReadPrivate private;
2395-
2396-
// private.datadir = datadir;
2397-
// private.tli = tli;
2398-
// xlogreader = XLogReaderAllocate(&SimpleXLogPageRead, &private);
2399-
// if (xlogreader == NULL)
2400-
// pg_fatal("out of memory\n");
2401-
2402-
// record = XLogReadRecord(xlogreader, ptr, &errormsg);
2403-
// if (record == NULL)
2404-
// {
2405-
// if (errormsg)
2406-
// pg_fatal("could not read WAL record at %X/%X: %s\n",
2407-
// (uint32) (ptr >> 32), (uint32) (ptr), errormsg);
2408-
// else
2409-
// pg_fatal("could not read WAL record at %X/%X\n",
2410-
// (uint32) (ptr >> 32), (uint32) (ptr));
2411-
// }
2412-
// endptr = xlogreader->EndRecPtr;
2413-
2414-
// XLogReaderFree(xlogreader);
2415-
// if (xlogreadfd != -1)
2416-
// {
2417-
// close(xlogreadfd);
2418-
// xlogreadfd = -1;
2419-
// }
2420-
2421-
// return XLogRecGetData(record)
2422-
// }
2423-
2424-
2425-
2426-
2427-
2428-
2429-
2430-
2431-
24322313

24332314

24342315

0 commit comments

Comments
 (0)