Skip to content

Commit 422a55a

Browse files
Refactor to create generic WAL page read callback
Previously we didn’t have a generic WAL page read callback function, surprisingly. Logical decoding has logical_read_local_xlog_page(), which was actually generic, so move that to xlogfunc.c and rename to read_local_xlog_page(). Maintain logical_read_local_xlog_page() so existing callers still work. As requested by Michael Paquier, Alvaro Herrera and Andres Freund
1 parent 45be99f commit 422a55a

File tree

3 files changed

+172
-155
lines changed

3 files changed

+172
-155
lines changed

src/backend/access/transam/xlogutils.c

+166
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,12 @@
1717
*/
1818
#include "postgres.h"
1919

20+
#include <unistd.h>
21+
22+
#include "miscadmin.h"
23+
2024
#include "access/xlog.h"
25+
#include "access/xlog_internal.h"
2126
#include "access/xlogutils.h"
2227
#include "catalog/catalog.h"
2328
#include "storage/smgr.h"
@@ -631,3 +636,164 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
631636
{
632637
forget_invalid_pages(rnode, forkNum, nblocks);
633638
}
639+
640+
/*
641+
* TODO: This is duplicate code with pg_xlogdump, similar to walsender.c, but
642+
* we currently don't have the infrastructure (elog!) to share it.
643+
*/
644+
static void
645+
XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
646+
{
647+
char *p;
648+
XLogRecPtr recptr;
649+
Size nbytes;
650+
651+
static int sendFile = -1;
652+
static XLogSegNo sendSegNo = 0;
653+
static uint32 sendOff = 0;
654+
655+
p = buf;
656+
recptr = startptr;
657+
nbytes = count;
658+
659+
while (nbytes > 0)
660+
{
661+
uint32 startoff;
662+
int segbytes;
663+
int readbytes;
664+
665+
startoff = recptr % XLogSegSize;
666+
667+
if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
668+
{
669+
char path[MAXPGPATH];
670+
671+
/* Switch to another logfile segment */
672+
if (sendFile >= 0)
673+
close(sendFile);
674+
675+
XLByteToSeg(recptr, sendSegNo);
676+
677+
XLogFilePath(path, tli, sendSegNo);
678+
679+
sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
680+
681+
if (sendFile < 0)
682+
{
683+
if (errno == ENOENT)
684+
ereport(ERROR,
685+
(errcode_for_file_access(),
686+
errmsg("requested WAL segment %s has already been removed",
687+
path)));
688+
else
689+
ereport(ERROR,
690+
(errcode_for_file_access(),
691+
errmsg("could not open file \"%s\": %m",
692+
path)));
693+
}
694+
sendOff = 0;
695+
}
696+
697+
/* Need to seek in the file? */
698+
if (sendOff != startoff)
699+
{
700+
if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
701+
{
702+
char path[MAXPGPATH];
703+
704+
XLogFilePath(path, tli, sendSegNo);
705+
706+
ereport(ERROR,
707+
(errcode_for_file_access(),
708+
errmsg("could not seek in log segment %s to offset %u: %m",
709+
path, startoff)));
710+
}
711+
sendOff = startoff;
712+
}
713+
714+
/* How many bytes are within this segment? */
715+
if (nbytes > (XLogSegSize - startoff))
716+
segbytes = XLogSegSize - startoff;
717+
else
718+
segbytes = nbytes;
719+
720+
readbytes = read(sendFile, p, segbytes);
721+
if (readbytes <= 0)
722+
{
723+
char path[MAXPGPATH];
724+
725+
XLogFilePath(path, tli, sendSegNo);
726+
727+
ereport(ERROR,
728+
(errcode_for_file_access(),
729+
errmsg("could not read from log segment %s, offset %u, length %lu: %m",
730+
path, sendOff, (unsigned long) segbytes)));
731+
}
732+
733+
/* Update state for read */
734+
recptr += readbytes;
735+
736+
sendOff += readbytes;
737+
nbytes -= readbytes;
738+
p += readbytes;
739+
}
740+
}
741+
742+
/*
743+
* read_page callback for reading local xlog files
744+
*
745+
* Public because it would likely be very helpful for someone writing another
746+
* output method outside walsender, e.g. in a bgworker.
747+
*
748+
* TODO: The walsender has it's own version of this, but it relies on the
749+
* walsender's latch being set whenever WAL is flushed. No such infrastructure
750+
* exists for normal backends, so we have to do a check/sleep/repeat style of
751+
* loop for now.
752+
*/
753+
int
754+
read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
755+
int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
756+
{
757+
XLogRecPtr flushptr,
758+
loc;
759+
int count;
760+
761+
loc = targetPagePtr + reqLen;
762+
while (1)
763+
{
764+
/*
765+
* TODO: we're going to have to do something more intelligent about
766+
* timelines on standbys. Use readTimeLineHistory() and
767+
* tliOfPointInHistory() to get the proper LSN? For now we'll catch
768+
* that case earlier, but the code and TODO is left in here for when
769+
* that changes.
770+
*/
771+
if (!RecoveryInProgress())
772+
{
773+
*pageTLI = ThisTimeLineID;
774+
flushptr = GetFlushRecPtr();
775+
}
776+
else
777+
flushptr = GetXLogReplayRecPtr(pageTLI);
778+
779+
if (loc <= flushptr)
780+
break;
781+
782+
CHECK_FOR_INTERRUPTS();
783+
pg_usleep(1000L);
784+
}
785+
786+
/* more than one block available */
787+
if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
788+
count = XLOG_BLCKSZ;
789+
/* not enough data there */
790+
else if (targetPagePtr + reqLen > flushptr)
791+
return -1;
792+
/* part of the page available */
793+
else
794+
count = flushptr - targetPagePtr;
795+
796+
XLogRead(cur_page, *pageTLI, targetPagePtr, XLOG_BLCKSZ);
797+
798+
return count;
799+
}

src/backend/replication/logical/logicalfuncs.c

+3-155
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include "miscadmin.h"
2323

2424
#include "access/xlog_internal.h"
25+
#include "access/xlogutils.h"
2526

2627
#include "catalog/pg_type.h"
2728

@@ -100,108 +101,6 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
100101
p->returned_rows++;
101102
}
102103

103-
/*
104-
* TODO: This is duplicate code with pg_xlogdump, similar to walsender.c, but
105-
* we currently don't have the infrastructure (elog!) to share it.
106-
*/
107-
static void
108-
XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
109-
{
110-
char *p;
111-
XLogRecPtr recptr;
112-
Size nbytes;
113-
114-
static int sendFile = -1;
115-
static XLogSegNo sendSegNo = 0;
116-
static uint32 sendOff = 0;
117-
118-
p = buf;
119-
recptr = startptr;
120-
nbytes = count;
121-
122-
while (nbytes > 0)
123-
{
124-
uint32 startoff;
125-
int segbytes;
126-
int readbytes;
127-
128-
startoff = recptr % XLogSegSize;
129-
130-
if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
131-
{
132-
char path[MAXPGPATH];
133-
134-
/* Switch to another logfile segment */
135-
if (sendFile >= 0)
136-
close(sendFile);
137-
138-
XLByteToSeg(recptr, sendSegNo);
139-
140-
XLogFilePath(path, tli, sendSegNo);
141-
142-
sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
143-
144-
if (sendFile < 0)
145-
{
146-
if (errno == ENOENT)
147-
ereport(ERROR,
148-
(errcode_for_file_access(),
149-
errmsg("requested WAL segment %s has already been removed",
150-
path)));
151-
else
152-
ereport(ERROR,
153-
(errcode_for_file_access(),
154-
errmsg("could not open file \"%s\": %m",
155-
path)));
156-
}
157-
sendOff = 0;
158-
}
159-
160-
/* Need to seek in the file? */
161-
if (sendOff != startoff)
162-
{
163-
if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
164-
{
165-
char path[MAXPGPATH];
166-
167-
XLogFilePath(path, tli, sendSegNo);
168-
169-
ereport(ERROR,
170-
(errcode_for_file_access(),
171-
errmsg("could not seek in log segment %s to offset %u: %m",
172-
path, startoff)));
173-
}
174-
sendOff = startoff;
175-
}
176-
177-
/* How many bytes are within this segment? */
178-
if (nbytes > (XLogSegSize - startoff))
179-
segbytes = XLogSegSize - startoff;
180-
else
181-
segbytes = nbytes;
182-
183-
readbytes = read(sendFile, p, segbytes);
184-
if (readbytes <= 0)
185-
{
186-
char path[MAXPGPATH];
187-
188-
XLogFilePath(path, tli, sendSegNo);
189-
190-
ereport(ERROR,
191-
(errcode_for_file_access(),
192-
errmsg("could not read from log segment %s, offset %u, length %lu: %m",
193-
path, sendOff, (unsigned long) segbytes)));
194-
}
195-
196-
/* Update state for read */
197-
recptr += readbytes;
198-
199-
sendOff += readbytes;
200-
nbytes -= readbytes;
201-
p += readbytes;
202-
}
203-
}
204-
205104
static void
206105
check_permissions(void)
207106
{
@@ -211,63 +110,12 @@ check_permissions(void)
211110
(errmsg("must be superuser or replication role to use replication slots"))));
212111
}
213112

214-
/*
215-
* read_page callback for logical decoding contexts.
216-
*
217-
* Public because it would likely be very helpful for someone writing another
218-
* output method outside walsender, e.g. in a bgworker.
219-
*
220-
* TODO: The walsender has it's own version of this, but it relies on the
221-
* walsender's latch being set whenever WAL is flushed. No such infrastructure
222-
* exists for normal backends, so we have to do a check/sleep/repeat style of
223-
* loop for now.
224-
*/
225113
int
226114
logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
227115
int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
228116
{
229-
XLogRecPtr flushptr,
230-
loc;
231-
int count;
232-
233-
loc = targetPagePtr + reqLen;
234-
while (1)
235-
{
236-
/*
237-
* TODO: we're going to have to do something more intelligent about
238-
* timelines on standbys. Use readTimeLineHistory() and
239-
* tliOfPointInHistory() to get the proper LSN? For now we'll catch
240-
* that case earlier, but the code and TODO is left in here for when
241-
* that changes.
242-
*/
243-
if (!RecoveryInProgress())
244-
{
245-
*pageTLI = ThisTimeLineID;
246-
flushptr = GetFlushRecPtr();
247-
}
248-
else
249-
flushptr = GetXLogReplayRecPtr(pageTLI);
250-
251-
if (loc <= flushptr)
252-
break;
253-
254-
CHECK_FOR_INTERRUPTS();
255-
pg_usleep(1000L);
256-
}
257-
258-
/* more than one block available */
259-
if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
260-
count = XLOG_BLCKSZ;
261-
/* not enough data there */
262-
else if (targetPagePtr + reqLen > flushptr)
263-
return -1;
264-
/* part of the page available */
265-
else
266-
count = flushptr - targetPagePtr;
267-
268-
XLogRead(cur_page, *pageTLI, targetPagePtr, XLOG_BLCKSZ);
269-
270-
return count;
117+
return read_local_xlog_page(state, targetPagePtr, reqLen,
118+
targetRecPtr, cur_page, pageTLI);
271119
}
272120

273121
/*

src/include/access/xlogutils.h

+3
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,7 @@ extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
4747
extern Relation CreateFakeRelcacheEntry(RelFileNode rnode);
4848
extern void FreeFakeRelcacheEntry(Relation fakerel);
4949

50+
extern int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
51+
int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI);
52+
5053
#endif

0 commit comments

Comments
 (0)