Skip to content

Commit 323cbe7

Browse files
committed
Remove read_page callback from XLogReader.
Previously, the XLogReader module would fetch new input data using a callback function. Redesign the interface so that it tells the caller to insert more data with a special return value instead. This API suits later patches for prefetching, encryption and maybe other future projects that would otherwise require continually extending the callback interface. As incidental cleanup work, move global variables readOff, readLen and readSegNo inside XlogReaderState. Author: Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> Author: Heikki Linnakangas <hlinnaka@iki.fi> (parts of earlier version) Reviewed-by: Antonin Houska <ah@cybertec.at> Reviewed-by: Alvaro Herrera <alvherre@2ndquadrant.com> Reviewed-by: Takashi Menjo <takashi.menjo@gmail.com> Reviewed-by: Andres Freund <andres@anarazel.de> Reviewed-by: Thomas Munro <thomas.munro@gmail.com> Discussion: https://postgr.es/m/20190418.210257.43726183.horiguchi.kyotaro%40lab.ntt.co.jp
1 parent 5ac9c43 commit 323cbe7

File tree

13 files changed

+930
-677
lines changed

13 files changed

+930
-677
lines changed

src/backend/access/transam/twophase.c

+8-6
Original file line numberDiff line numberDiff line change
@@ -1330,19 +1330,21 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
13301330
char *errormsg;
13311331
TimeLineID save_currtli = ThisTimeLineID;
13321332

1333-
xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
1334-
XL_ROUTINE(.page_read = &read_local_xlog_page,
1335-
.segment_open = &wal_segment_open,
1336-
.segment_close = &wal_segment_close),
1337-
NULL);
1333+
xlogreader = XLogReaderAllocate(wal_segment_size, NULL, wal_segment_close);
1334+
13381335
if (!xlogreader)
13391336
ereport(ERROR,
13401337
(errcode(ERRCODE_OUT_OF_MEMORY),
13411338
errmsg("out of memory"),
13421339
errdetail("Failed while allocating a WAL reading processor.")));
13431340

13441341
XLogBeginRead(xlogreader, lsn);
1345-
record = XLogReadRecord(xlogreader, &errormsg);
1342+
while (XLogReadRecord(xlogreader, &record, &errormsg) ==
1343+
XLREAD_NEED_DATA)
1344+
{
1345+
if (!read_local_xlog_page(xlogreader))
1346+
break;
1347+
}
13461348

13471349
/*
13481350
* Restore immediately the timeline where it was previously, as

src/backend/access/transam/xlog.c

+57-69
Original file line numberDiff line numberDiff line change
@@ -811,17 +811,13 @@ static XLogSegNo openLogSegNo = 0;
811811
* These variables are used similarly to the ones above, but for reading
812812
* the XLOG. Note, however, that readOff generally represents the offset
813813
* of the page just read, not the seek position of the FD itself, which
814-
* will be just past that page. readLen indicates how much of the current
815-
* page has been read into readBuf, and readSource indicates where we got
816-
* the currently open file from.
814+
* will be just past that page. readSource indicates where we got the
815+
* currently open file from.
817816
* Note: we could use Reserve/ReleaseExternalFD to track consumption of
818817
* this FD too; but it doesn't currently seem worthwhile, since the XLOG is
819818
* not read by general-purpose sessions.
820819
*/
821820
static int readFile = -1;
822-
static XLogSegNo readSegNo = 0;
823-
static uint32 readOff = 0;
824-
static uint32 readLen = 0;
825821
static XLogSource readSource = XLOG_FROM_ANY;
826822

827823
/*
@@ -838,13 +834,6 @@ static XLogSource currentSource = XLOG_FROM_ANY;
838834
static bool lastSourceFailed = false;
839835
static bool pendingWalRcvRestart = false;
840836

841-
typedef struct XLogPageReadPrivate
842-
{
843-
int emode;
844-
bool fetching_ckpt; /* are we fetching a checkpoint record? */
845-
bool randAccess;
846-
} XLogPageReadPrivate;
847-
848837
/*
849838
* These variables track when we last obtained some WAL data to process,
850839
* and where we got it from. (XLogReceiptSource is initially the same as
@@ -920,10 +909,12 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
920909
static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
921910
XLogSource source, bool notfoundOk);
922911
static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, XLogSource source);
923-
static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
924-
int reqLen, XLogRecPtr targetRecPtr, char *readBuf);
912+
static bool XLogPageRead(XLogReaderState *state,
913+
bool fetching_ckpt, int emode, bool randAccess);
925914
static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
926-
bool fetching_ckpt, XLogRecPtr tliRecPtr);
915+
bool fetching_ckpt,
916+
XLogRecPtr tliRecPtr,
917+
XLogSegNo readSegNo);
927918
static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr);
928919
static void XLogFileClose(void);
929920
static void PreallocXlogFiles(XLogRecPtr endptr);
@@ -1234,8 +1225,7 @@ XLogInsertRecord(XLogRecData *rdata,
12341225
appendBinaryStringInfo(&recordBuf, rdata->data, rdata->len);
12351226

12361227
if (!debug_reader)
1237-
debug_reader = XLogReaderAllocate(wal_segment_size, NULL,
1238-
XL_ROUTINE(), NULL);
1228+
debug_reader = XLogReaderAllocate(wal_segment_size, NULL, NULL);
12391229

12401230
if (!debug_reader)
12411231
{
@@ -4373,21 +4363,24 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
43734363
bool fetching_ckpt)
43744364
{
43754365
XLogRecord *record;
4376-
XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data;
4377-
4378-
/* Pass through parameters to XLogPageRead */
4379-
private->fetching_ckpt = fetching_ckpt;
4380-
private->emode = emode;
4381-
private->randAccess = (xlogreader->ReadRecPtr == InvalidXLogRecPtr);
4366+
bool randAccess = (xlogreader->ReadRecPtr == InvalidXLogRecPtr);
43824367

43834368
/* This is the first attempt to read this page. */
43844369
lastSourceFailed = false;
43854370

43864371
for (;;)
43874372
{
43884373
char *errormsg;
4374+
XLogReadRecordResult result;
4375+
4376+
while ((result = XLogReadRecord(xlogreader, &record, &errormsg))
4377+
== XLREAD_NEED_DATA)
4378+
{
4379+
if (!XLogPageRead(xlogreader, fetching_ckpt, emode, randAccess))
4380+
break;
4381+
4382+
}
43894383

4390-
record = XLogReadRecord(xlogreader, &errormsg);
43914384
ReadRecPtr = xlogreader->ReadRecPtr;
43924385
EndRecPtr = xlogreader->EndRecPtr;
43934386
if (record == NULL)
@@ -6457,7 +6450,6 @@ StartupXLOG(void)
64576450
bool backupFromStandby = false;
64586451
DBState dbstate_at_startup;
64596452
XLogReaderState *xlogreader;
6460-
XLogPageReadPrivate private;
64616453
bool promoted = false;
64626454
struct stat st;
64636455

@@ -6616,13 +6608,9 @@ StartupXLOG(void)
66166608
OwnLatch(&XLogCtl->recoveryWakeupLatch);
66176609

66186610
/* Set up XLOG reader facility */
6619-
MemSet(&private, 0, sizeof(XLogPageReadPrivate));
66206611
xlogreader =
6621-
XLogReaderAllocate(wal_segment_size, NULL,
6622-
XL_ROUTINE(.page_read = &XLogPageRead,
6623-
.segment_open = NULL,
6624-
.segment_close = wal_segment_close),
6625-
&private);
6612+
XLogReaderAllocate(wal_segment_size, NULL, wal_segment_close);
6613+
66266614
if (!xlogreader)
66276615
ereport(ERROR,
66286616
(errcode(ERRCODE_OUT_OF_MEMORY),
@@ -7819,7 +7807,8 @@ StartupXLOG(void)
78197807
XLogRecPtr pageBeginPtr;
78207808

78217809
pageBeginPtr = EndOfLog - (EndOfLog % XLOG_BLCKSZ);
7822-
Assert(readOff == XLogSegmentOffset(pageBeginPtr, wal_segment_size));
7810+
Assert(XLogSegmentOffset(xlogreader->readPagePtr, wal_segment_size) ==
7811+
XLogSegmentOffset(pageBeginPtr, wal_segment_size));
78237812

78247813
firstIdx = XLogRecPtrToBufIdx(EndOfLog);
78257814

@@ -12107,13 +12096,15 @@ CancelBackup(void)
1210712096
* XLogPageRead() to try fetching the record from another source, or to
1210812097
* sleep and retry.
1210912098
*/
12110-
static int
12111-
XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
12112-
XLogRecPtr targetRecPtr, char *readBuf)
12113-
{
12114-
XLogPageReadPrivate *private =
12115-
(XLogPageReadPrivate *) xlogreader->private_data;
12116-
int emode = private->emode;
12099+
static bool
12100+
XLogPageRead(XLogReaderState *state,
12101+
bool fetching_ckpt, int emode, bool randAccess)
12102+
{
12103+
char *readBuf = state->readBuf;
12104+
XLogRecPtr targetPagePtr = state->readPagePtr;
12105+
int reqLen = state->reqLen;
12106+
int readLen = 0;
12107+
XLogRecPtr targetRecPtr = state->ReadRecPtr;
1211712108
uint32 targetPageOff;
1211812109
XLogSegNo targetSegNo PG_USED_FOR_ASSERTS_ONLY;
1211912110
int r;
@@ -12126,18 +12117,18 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
1212612117
* is not in the currently open one.
1212712118
*/
1212812119
if (readFile >= 0 &&
12129-
!XLByteInSeg(targetPagePtr, readSegNo, wal_segment_size))
12120+
!XLByteInSeg(targetPagePtr, state->seg.ws_segno, wal_segment_size))
1213012121
{
1213112122
/*
1213212123
* Request a restartpoint if we've replayed too much xlog since the
1213312124
* last one.
1213412125
*/
1213512126
if (bgwriterLaunched)
1213612127
{
12137-
if (XLogCheckpointNeeded(readSegNo))
12128+
if (XLogCheckpointNeeded(state->seg.ws_segno))
1213812129
{
1213912130
(void) GetRedoRecPtr();
12140-
if (XLogCheckpointNeeded(readSegNo))
12131+
if (XLogCheckpointNeeded(state->seg.ws_segno))
1214112132
RequestCheckpoint(CHECKPOINT_CAUSE_XLOG);
1214212133
}
1214312134
}
@@ -12147,7 +12138,7 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
1214712138
readSource = XLOG_FROM_ANY;
1214812139
}
1214912140

12150-
XLByteToSeg(targetPagePtr, readSegNo, wal_segment_size);
12141+
XLByteToSeg(targetPagePtr, state->seg.ws_segno, wal_segment_size);
1215112142

1215212143
retry:
1215312144
/* See if we need to retrieve more data */
@@ -12156,17 +12147,15 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
1215612147
flushedUpto < targetPagePtr + reqLen))
1215712148
{
1215812149
if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen,
12159-
private->randAccess,
12160-
private->fetching_ckpt,
12161-
targetRecPtr))
12150+
randAccess, fetching_ckpt,
12151+
targetRecPtr, state->seg.ws_segno))
1216212152
{
1216312153
if (readFile >= 0)
1216412154
close(readFile);
1216512155
readFile = -1;
12166-
readLen = 0;
1216712156
readSource = XLOG_FROM_ANY;
12168-
12169-
return -1;
12157+
XLogReaderSetInputData(state, -1);
12158+
return false;
1217012159
}
1217112160
}
1217212161

@@ -12193,40 +12182,36 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
1219312182
else
1219412183
readLen = XLOG_BLCKSZ;
1219512184

12196-
/* Read the requested page */
12197-
readOff = targetPageOff;
12198-
1219912185
pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
12200-
r = pg_pread(readFile, readBuf, XLOG_BLCKSZ, (off_t) readOff);
12186+
r = pg_pread(readFile, readBuf, XLOG_BLCKSZ, (off_t) targetPageOff);
1220112187
if (r != XLOG_BLCKSZ)
1220212188
{
1220312189
char fname[MAXFNAMELEN];
1220412190
int save_errno = errno;
1220512191

1220612192
pgstat_report_wait_end();
12207-
XLogFileName(fname, curFileTLI, readSegNo, wal_segment_size);
12193+
XLogFileName(fname, curFileTLI, state->seg.ws_segno, wal_segment_size);
1220812194
if (r < 0)
1220912195
{
1221012196
errno = save_errno;
1221112197
ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen),
1221212198
(errcode_for_file_access(),
1221312199
errmsg("could not read from log segment %s, offset %u: %m",
12214-
fname, readOff)));
12200+
fname, targetPageOff)));
1221512201
}
1221612202
else
1221712203
ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen),
1221812204
(errcode(ERRCODE_DATA_CORRUPTED),
1221912205
errmsg("could not read from log segment %s, offset %u: read %d of %zu",
12220-
fname, readOff, r, (Size) XLOG_BLCKSZ)));
12206+
fname, targetPageOff, r, (Size) XLOG_BLCKSZ)));
1222112207
goto next_record_is_invalid;
1222212208
}
1222312209
pgstat_report_wait_end();
1222412210

12225-
Assert(targetSegNo == readSegNo);
12226-
Assert(targetPageOff == readOff);
12227-
Assert(reqLen <= readLen);
12211+
Assert(targetSegNo == state->seg.ws_segno);
12212+
Assert(readLen >= reqLen);
1222812213

12229-
xlogreader->seg.ws_tli = curFileTLI;
12214+
state->seg.ws_tli = curFileTLI;
1223012215

1223112216
/*
1223212217
* Check the page header immediately, so that we can retry immediately if
@@ -12254,29 +12239,31 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
1225412239
* Validating the page header is cheap enough that doing it twice
1225512240
* shouldn't be a big deal from a performance point of view.
1225612241
*/
12257-
if (!XLogReaderValidatePageHeader(xlogreader, targetPagePtr, readBuf))
12242+
if (!XLogReaderValidatePageHeader(state, targetPagePtr, readBuf))
1225812243
{
12259-
/* reset any error XLogReaderValidatePageHeader() might have set */
12260-
xlogreader->errormsg_buf[0] = '\0';
12244+
/* reset any error StateValidatePageHeader() might have set */
12245+
state->errormsg_buf[0] = '\0';
1226112246
goto next_record_is_invalid;
1226212247
}
1226312248

12264-
return readLen;
12249+
Assert(state->readPagePtr == targetPagePtr);
12250+
XLogReaderSetInputData(state, readLen);
12251+
return true;
1226512252

1226612253
next_record_is_invalid:
1226712254
lastSourceFailed = true;
1226812255

1226912256
if (readFile >= 0)
1227012257
close(readFile);
1227112258
readFile = -1;
12272-
readLen = 0;
1227312259
readSource = XLOG_FROM_ANY;
1227412260

1227512261
/* In standby-mode, keep trying */
1227612262
if (StandbyMode)
1227712263
goto retry;
12278-
else
12279-
return -1;
12264+
12265+
XLogReaderSetInputData(state, -1);
12266+
return false;
1228012267
}
1228112268

1228212269
/*
@@ -12307,7 +12294,8 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
1230712294
*/
1230812295
static bool
1230912296
WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
12310-
bool fetching_ckpt, XLogRecPtr tliRecPtr)
12297+
bool fetching_ckpt, XLogRecPtr tliRecPtr,
12298+
XLogSegNo readSegNo)
1231112299
{
1231212300
static TimestampTz last_fail_time = 0;
1231312301
TimestampTz now;

0 commit comments

Comments
 (0)