Skip to content

Commit 850196b

Browse files
committed
Adjust walsender usage of xlogreader, simplify APIs
* Have both physical and logical walsender share a 'xlogreader' state struct for tracking state. This replaces the existing globals sendSeg and sendCxt. * Change WALRead not to receive XLogReaderState->seg and ->segcxt as separate arguments anymore; just use the ones from 'state'. This is made possible by the above change. * have the XLogReader segment_open contract require the callbacks to install the file descriptor in the state struct themselves instead of returning it. xlogreader was already ignoring any possible failed return from the callbacks, relying solely on them never returning. (This point is not altogether excellent, as it means the callbacks have to know more of XLogReaderState; but to really improve on that we would have to pass back error info from the callbacks to xlogreader. And the complexity would not be saved but instead just transferred to the callers of WALRead, which would have to learn how to throw errors from the open_segment callback in addition of, as currently, from pg_pread.) * segment_open no longer receives the 'segcxt' as a separate argument, since it's part of the XLogReaderState argument. Per comments from Kyotaro Horiguchi. Author: Álvaro Herrera <alvherre@alvh.no-ip.org> Discussion: https://postgr.es/m/20200511203336.GA9913@alvherre.pgsql
1 parent 043e3e0 commit 850196b

File tree

6 files changed

+83
-105
lines changed

6 files changed

+83
-105
lines changed

src/backend/access/transam/xlogreader.c

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1044,14 +1044,12 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
10441044

10451045
/*
10461046
* Helper function to ease writing of XLogRoutine->page_read callbacks.
1047-
* If this function is used, caller must supply an open_segment callback in
1047+
* If this function is used, caller must supply a segment_open callback in
10481048
* 'state', as that is used here.
10491049
*
10501050
* Read 'count' bytes into 'buf', starting at location 'startptr', from WAL
10511051
* fetched from timeline 'tli'.
10521052
*
1053-
* 'seg/segcxt' identify the last segment used.
1054-
*
10551053
* Returns true if succeeded, false if an error occurs, in which case
10561054
* 'errinfo' receives error details.
10571055
*
@@ -1061,7 +1059,6 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
10611059
bool
10621060
WALRead(XLogReaderState *state,
10631061
char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
1064-
WALOpenSegment *seg, WALSegmentContext *segcxt,
10651062
WALReadError *errinfo)
10661063
{
10671064
char *p;
@@ -1078,34 +1075,36 @@ WALRead(XLogReaderState *state,
10781075
int segbytes;
10791076
int readbytes;
10801077

1081-
startoff = XLogSegmentOffset(recptr, segcxt->ws_segsize);
1078+
startoff = XLogSegmentOffset(recptr, state->segcxt.ws_segsize);
10821079

10831080
/*
10841081
* If the data we want is not in a segment we have open, close what we
10851082
* have (if anything) and open the next one, using the caller's
10861083
* provided openSegment callback.
10871084
*/
1088-
if (seg->ws_file < 0 ||
1089-
!XLByteInSeg(recptr, seg->ws_segno, segcxt->ws_segsize) ||
1090-
tli != seg->ws_tli)
1085+
if (state->seg.ws_file < 0 ||
1086+
!XLByteInSeg(recptr, state->seg.ws_segno, state->segcxt.ws_segsize) ||
1087+
tli != state->seg.ws_tli)
10911088
{
10921089
XLogSegNo nextSegNo;
10931090

1094-
if (seg->ws_file >= 0)
1091+
if (state->seg.ws_file >= 0)
10951092
state->routine.segment_close(state);
10961093

1097-
XLByteToSeg(recptr, nextSegNo, segcxt->ws_segsize);
1098-
seg->ws_file = state->routine.segment_open(state, nextSegNo,
1099-
segcxt, &tli);
1094+
XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize);
1095+
state->routine.segment_open(state, nextSegNo, &tli);
1096+
1097+
/* This shouldn't happen -- indicates a bug in segment_open */
1098+
Assert(state->seg.ws_file >= 0);
11001099

11011100
/* Update the current segment info. */
1102-
seg->ws_tli = tli;
1103-
seg->ws_segno = nextSegNo;
1101+
state->seg.ws_tli = tli;
1102+
state->seg.ws_segno = nextSegNo;
11041103
}
11051104

11061105
/* How many bytes are within this segment? */
1107-
if (nbytes > (segcxt->ws_segsize - startoff))
1108-
segbytes = segcxt->ws_segsize - startoff;
1106+
if (nbytes > (state->segcxt.ws_segsize - startoff))
1107+
segbytes = state->segcxt.ws_segsize - startoff;
11091108
else
11101109
segbytes = nbytes;
11111110

@@ -1115,7 +1114,7 @@ WALRead(XLogReaderState *state,
11151114

11161115
/* Reset errno first; eases reporting non-errno-affecting errors */
11171116
errno = 0;
1118-
readbytes = pg_pread(seg->ws_file, p, segbytes, (off_t) startoff);
1117+
readbytes = pg_pread(state->seg.ws_file, p, segbytes, (off_t) startoff);
11191118

11201119
#ifndef FRONTEND
11211120
pgstat_report_wait_end();
@@ -1127,7 +1126,7 @@ WALRead(XLogReaderState *state,
11271126
errinfo->wre_req = segbytes;
11281127
errinfo->wre_read = readbytes;
11291128
errinfo->wre_off = startoff;
1130-
errinfo->wre_seg = *seg;
1129+
errinfo->wre_seg = state->seg;
11311130
return false;
11321131
}
11331132

src/backend/access/transam/xlogutils.c

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -784,18 +784,17 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
784784
}
785785

786786
/* XLogReaderRoutine->segment_open callback for local pg_wal files */
787-
int
787+
void
788788
wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo,
789-
WALSegmentContext *segcxt, TimeLineID *tli_p)
789+
TimeLineID *tli_p)
790790
{
791791
TimeLineID tli = *tli_p;
792792
char path[MAXPGPATH];
793-
int fd;
794793

795-
XLogFilePath(path, tli, nextSegNo, segcxt->ws_segsize);
796-
fd = BasicOpenFile(path, O_RDONLY | PG_BINARY);
797-
if (fd >= 0)
798-
return fd;
794+
XLogFilePath(path, tli, nextSegNo, state->segcxt.ws_segsize);
795+
state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
796+
if (state->seg.ws_file >= 0)
797+
return;
799798

800799
if (errno == ENOENT)
801800
ereport(ERROR,
@@ -807,8 +806,6 @@ wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo,
807806
(errcode_for_file_access(),
808807
errmsg("could not open file \"%s\": %m",
809808
path)));
810-
811-
return -1; /* keep compiler quiet */
812809
}
813810

814811
/* stock XLogReaderRoutine->segment_close callback */
@@ -947,7 +944,6 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
947944
* zero-padded up to the page boundary if it's incomplete.
948945
*/
949946
if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ, tli,
950-
&state->seg, &state->segcxt,
951947
&errinfo))
952948
WALReadRaiseError(&errinfo);
953949

src/backend/replication/walsender.c

Lines changed: 47 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,14 @@ bool log_replication_commands = false;
129129
*/
130130
bool wake_wal_senders = false;
131131

132-
static WALOpenSegment *sendSeg = NULL;
133-
static WALSegmentContext *sendCxt = NULL;
132+
/*
133+
* Physical walsender does not use xlogreader to read WAL, but it does use a
134+
* fake one to keep state. Logical walsender uses a proper xlogreader. Both
135+
* keep the 'xlogreader' pointer to the right one, for the sake of common
136+
* routines.
137+
*/
138+
static XLogReaderState fake_xlogreader;
139+
static XLogReaderState *xlogreader;
134140

135141
/*
136142
* These variables keep track of the state of the timeline we're currently
@@ -248,8 +254,8 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
248254
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
249255
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
250256

251-
static int WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
252-
WALSegmentContext *segcxt, TimeLineID *tli_p);
257+
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
258+
TimeLineID *tli_p);
253259
static void UpdateSpillStats(LogicalDecodingContext *ctx);
254260

255261

@@ -280,12 +286,19 @@ InitWalSender(void)
280286
/* Initialize empty timestamp buffer for lag tracking. */
281287
lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker));
282288

283-
/* Make sure we can remember the current read position in XLOG. */
284-
sendSeg = (WALOpenSegment *)
285-
MemoryContextAlloc(TopMemoryContext, sizeof(WALOpenSegment));
286-
sendCxt = (WALSegmentContext *)
287-
MemoryContextAlloc(TopMemoryContext, sizeof(WALSegmentContext));
288-
WALOpenSegmentInit(sendSeg, sendCxt, wal_segment_size, NULL);
289+
/*
290+
* Prepare physical walsender's fake xlogreader struct. Logical walsender
291+
* does this later.
292+
*/
293+
if (!am_db_walsender)
294+
{
295+
xlogreader = &fake_xlogreader;
296+
xlogreader->routine =
297+
*XL_ROUTINE(.segment_open = WalSndSegmentOpen,
298+
.segment_close = wal_segment_close);
299+
WALOpenSegmentInit(&xlogreader->seg, &xlogreader->segcxt,
300+
wal_segment_size, NULL);
301+
}
289302
}
290303

291304
/*
@@ -302,11 +315,8 @@ WalSndErrorCleanup(void)
302315
ConditionVariableCancelSleep();
303316
pgstat_report_wait_end();
304317

305-
if (sendSeg->ws_file >= 0)
306-
{
307-
close(sendSeg->ws_file);
308-
sendSeg->ws_file = -1;
309-
}
318+
if (xlogreader->seg.ws_file >= 0)
319+
wal_segment_close(xlogreader);
310320

311321
if (MyReplicationSlot != NULL)
312322
ReplicationSlotRelease();
@@ -837,11 +847,9 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
837847
cur_page,
838848
targetPagePtr,
839849
XLOG_BLCKSZ,
840-
sendSeg->ws_tli, /* Pass the current TLI because only
850+
state->seg.ws_tli, /* Pass the current TLI because only
841851
* WalSndSegmentOpen controls whether new
842852
* TLI is needed. */
843-
sendSeg,
844-
sendCxt,
845853
&errinfo))
846854
WALReadRaiseError(&errinfo);
847855

@@ -852,8 +860,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
852860
* read() succeeds in that case, but the data we tried to read might
853861
* already have been overwritten with new WAL records.
854862
*/
855-
XLByteToSeg(targetPagePtr, segno, sendCxt->ws_segsize);
856-
CheckXLogRemoved(segno, sendSeg->ws_tli);
863+
XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
864+
CheckXLogRemoved(segno, state->seg.ws_tli);
857865

858866
return count;
859867
}
@@ -1176,6 +1184,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
11761184
.segment_close = wal_segment_close),
11771185
WalSndPrepareWrite, WalSndWriteData,
11781186
WalSndUpdateProgress);
1187+
xlogreader = logical_decoding_ctx->reader;
11791188

11801189
WalSndSetState(WALSNDSTATE_CATCHUP);
11811190

@@ -2447,13 +2456,11 @@ WalSndKill(int code, Datum arg)
24472456
}
24482457

24492458
/* XLogReaderRoutine->segment_open callback */
2450-
static int
2451-
WalSndSegmentOpen(XLogReaderState *state,
2452-
XLogSegNo nextSegNo, WALSegmentContext *segcxt,
2459+
static void
2460+
WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
24532461
TimeLineID *tli_p)
24542462
{
24552463
char path[MAXPGPATH];
2456-
int fd;
24572464

24582465
/*-------
24592466
* When reading from a historic timeline, and there is a timeline switch
@@ -2484,15 +2491,15 @@ WalSndSegmentOpen(XLogReaderState *state,
24842491
{
24852492
XLogSegNo endSegNo;
24862493

2487-
XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize);
2488-
if (sendSeg->ws_segno == endSegNo)
2494+
XLByteToSeg(sendTimeLineValidUpto, endSegNo, state->segcxt.ws_segsize);
2495+
if (state->seg.ws_segno == endSegNo)
24892496
*tli_p = sendTimeLineNextTLI;
24902497
}
24912498

2492-
XLogFilePath(path, *tli_p, nextSegNo, segcxt->ws_segsize);
2493-
fd = BasicOpenFile(path, O_RDONLY | PG_BINARY);
2494-
if (fd >= 0)
2495-
return fd;
2499+
XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
2500+
state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
2501+
if (state->seg.ws_file >= 0)
2502+
return;
24962503

24972504
/*
24982505
* If the file is not found, assume it's because the standby asked for a
@@ -2515,7 +2522,6 @@ WalSndSegmentOpen(XLogReaderState *state,
25152522
(errcode_for_file_access(),
25162523
errmsg("could not open file \"%s\": %m",
25172524
path)));
2518-
return -1; /* keep compiler quiet */
25192525
}
25202526

25212527
/*
@@ -2537,12 +2543,6 @@ XLogSendPhysical(void)
25372543
Size nbytes;
25382544
XLogSegNo segno;
25392545
WALReadError errinfo;
2540-
static XLogReaderState fake_xlogreader =
2541-
{
2542-
/* Fake xlogreader state for WALRead */
2543-
.routine.segment_open = WalSndSegmentOpen,
2544-
.routine.segment_close = wal_segment_close
2545-
};
25462546

25472547
/* If requested switch the WAL sender to the stopping state. */
25482548
if (got_STOPPING)
@@ -2685,9 +2685,8 @@ XLogSendPhysical(void)
26852685
if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
26862686
{
26872687
/* close the current file. */
2688-
if (sendSeg->ws_file >= 0)
2689-
close(sendSeg->ws_file);
2690-
sendSeg->ws_file = -1;
2688+
if (xlogreader->seg.ws_file >= 0)
2689+
wal_segment_close(xlogreader);
26912690

26922691
/* Send CopyDone */
26932692
pq_putmessage_noblock('c', NULL, 0);
@@ -2760,21 +2759,19 @@ XLogSendPhysical(void)
27602759
enlargeStringInfo(&output_message, nbytes);
27612760

27622761
retry:
2763-
if (!WALRead(&fake_xlogreader,
2762+
if (!WALRead(xlogreader,
27642763
&output_message.data[output_message.len],
27652764
startptr,
27662765
nbytes,
2767-
sendSeg->ws_tli, /* Pass the current TLI because only
2768-
* WalSndSegmentOpen controls whether new
2769-
* TLI is needed. */
2770-
sendSeg,
2771-
sendCxt,
2766+
xlogreader->seg.ws_tli, /* Pass the current TLI because
2767+
* only WalSndSegmentOpen controls
2768+
* whether new TLI is needed. */
27722769
&errinfo))
27732770
WALReadRaiseError(&errinfo);
27742771

27752772
/* See logical_read_xlog_page(). */
2776-
XLByteToSeg(startptr, segno, sendCxt->ws_segsize);
2777-
CheckXLogRemoved(segno, sendSeg->ws_tli);
2773+
XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
2774+
CheckXLogRemoved(segno, xlogreader->seg.ws_tli);
27782775

27792776
/*
27802777
* During recovery, the currently-open WAL file might be replaced with the
@@ -2792,10 +2789,9 @@ XLogSendPhysical(void)
27922789
walsnd->needreload = false;
27932790
SpinLockRelease(&walsnd->mutex);
27942791

2795-
if (reload && sendSeg->ws_file >= 0)
2792+
if (reload && xlogreader->seg.ws_file >= 0)
27962793
{
2797-
close(sendSeg->ws_file);
2798-
sendSeg->ws_file = -1;
2794+
wal_segment_close(xlogreader);
27992795

28002796
goto retry;
28012797
}

src/bin/pg_waldump/pg_waldump.c

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -280,17 +280,15 @@ identify_target_directory(char *directory, char *fname)
280280
}
281281

282282
/* pg_waldump's XLogReaderRoutine->segment_open callback */
283-
static int
284-
WALDumpOpenSegment(XLogReaderState *state,
285-
XLogSegNo nextSegNo, WALSegmentContext *segcxt,
283+
static void
284+
WALDumpOpenSegment(XLogReaderState *state, XLogSegNo nextSegNo,
286285
TimeLineID *tli_p)
287286
{
288287
TimeLineID tli = *tli_p;
289288
char fname[MAXPGPATH];
290-
int fd;
291289
int tries;
292290

293-
XLogFileName(fname, tli, nextSegNo, segcxt->ws_segsize);
291+
XLogFileName(fname, tli, nextSegNo, state->segcxt.ws_segsize);
294292

295293
/*
296294
* In follow mode there is a short period of time after the server has
@@ -300,9 +298,9 @@ WALDumpOpenSegment(XLogReaderState *state,
300298
*/
301299
for (tries = 0; tries < 10; tries++)
302300
{
303-
fd = open_file_in_directory(segcxt->ws_dir, fname);
304-
if (fd >= 0)
305-
return fd;
301+
state->seg.ws_file = open_file_in_directory(state->segcxt.ws_dir, fname);
302+
if (state->seg.ws_file >= 0)
303+
return;
306304
if (errno == ENOENT)
307305
{
308306
int save_errno = errno;
@@ -318,7 +316,6 @@ WALDumpOpenSegment(XLogReaderState *state,
318316
}
319317

320318
fatal_error("could not find file \"%s\": %m", fname);
321-
return -1; /* keep compiler quiet */
322319
}
323320

324321
/*
@@ -356,7 +353,6 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
356353
}
357354

358355
if (!WALRead(state, readBuff, targetPagePtr, count, private->timeline,
359-
&state->seg, &state->segcxt,
360356
&errinfo))
361357
{
362358
WALOpenSegment *seg = &errinfo.wre_seg;

0 commit comments

Comments
 (0)