Skip to content

Commit dd9b3fc

Browse files
committed
Fix issue with WAL archiving in standby.
Previously, walreceiver always closed the currently-opened WAL segment and created its archive notification file, after it finished writing the current segment up and received any WAL data that should be written into the next segment. If walreceiver exited just before any WAL data in the next segment arrived at standby, it did not create the archive notification file of the current segment even though that's known completed. This behavior could cause WAL archiving of the segment to be delayed until subsequent restartpoints or checkpoints created its notification file. To fix the issue, this commit changes walreceiver so that it creates an archive notification file of a current WAL segment immediately if that's known completed before receiving next WAL data. Back-patch to all supported branches. Reported-by: Kyotaro Horiguchi Author: Fujii Masao Reviewed-by: Kyotaro Horiguchi Discussion: https://postgr.es/m/20200630.165503.1465894182551545886.horikyota.ntt@gmail.com
1 parent da7d81d commit dd9b3fc

File tree

1 file changed

+62
-37
lines changed

1 file changed

+62
-37
lines changed

src/backend/replication/walreceiver.c

Lines changed: 62 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ static void WalRcvDie(int code, Datum arg);
131131
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
132132
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
133133
static void XLogWalRcvFlush(bool dying);
134+
static void XLogWalRcvClose(XLogRecPtr recptr);
134135
static void XLogWalRcvSendReply(bool force, bool requestReply);
135136
static void XLogWalRcvSendHSFeedback(bool immed);
136137
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
@@ -911,47 +912,16 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
911912
{
912913
int segbytes;
913914

914-
if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
915-
{
916-
bool use_existent;
917-
918-
/*
919-
* fsync() and close current file before we switch to next one. We
920-
* would otherwise have to reopen this file to fsync it later
921-
*/
922-
if (recvFile >= 0)
923-
{
924-
char xlogfname[MAXFNAMELEN];
925-
926-
XLogWalRcvFlush(false);
927-
928-
XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
929-
930-
/*
931-
* XLOG segment files will be re-read by recovery in startup
932-
* process soon, so we don't advise the OS to release cache
933-
* pages associated with the file like XLogFileClose() does.
934-
*/
935-
if (close(recvFile) != 0)
936-
ereport(PANIC,
937-
(errcode_for_file_access(),
938-
errmsg("could not close log segment %s: %m",
939-
xlogfname)));
915+
/* Close the current segment if it's completed */
916+
if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
917+
XLogWalRcvClose(recptr);
940918

941-
/*
942-
* Create .done file forcibly to prevent the streamed segment
943-
* from being archived later.
944-
*/
945-
if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
946-
XLogArchiveForceDone(xlogfname);
947-
else
948-
XLogArchiveNotify(xlogfname);
949-
}
950-
recvFile = -1;
919+
if (recvFile < 0)
920+
{
921+
bool use_existent = true;
951922

952923
/* Create/use new log file */
953924
XLByteToSeg(recptr, recvSegNo, wal_segment_size);
954-
use_existent = true;
955925
recvFile = XLogFileInit(recvSegNo, &use_existent, true);
956926
recvFileTLI = ThisTimeLineID;
957927
}
@@ -998,6 +968,15 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
998968

999969
/* Update shared-memory status */
1000970
pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write);
971+
972+
/*
973+
* Close the current segment if it's fully written up in the last cycle of
974+
* the loop, to create its archive notification file soon. Otherwise WAL
975+
* archiving of the segment will be delayed until any data in the next
976+
* segment is received and written.
977+
*/
978+
if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
979+
XLogWalRcvClose(recptr);
1001980
}
1002981

1003982
/*
@@ -1052,6 +1031,52 @@ XLogWalRcvFlush(bool dying)
10521031
}
10531032
}
10541033

1034+
/*
1035+
* Close the current segment.
1036+
*
1037+
* Flush the segment to disk before closing it. Otherwise we have to
1038+
* reopen and fsync it later.
1039+
*
1040+
* Create an archive notification file since the segment is known completed.
1041+
*/
1042+
static void
1043+
XLogWalRcvClose(XLogRecPtr recptr)
1044+
{
1045+
char xlogfname[MAXFNAMELEN];
1046+
1047+
Assert(recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size));
1048+
1049+
/*
1050+
* fsync() and close current file before we switch to next one. We would
1051+
* otherwise have to reopen this file to fsync it later
1052+
*/
1053+
XLogWalRcvFlush(false);
1054+
1055+
XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
1056+
1057+
/*
1058+
* XLOG segment files will be re-read by recovery in startup process soon,
1059+
* so we don't advise the OS to release cache pages associated with the
1060+
* file like XLogFileClose() does.
1061+
*/
1062+
if (close(recvFile) != 0)
1063+
ereport(PANIC,
1064+
(errcode_for_file_access(),
1065+
errmsg("could not close log segment %s: %m",
1066+
xlogfname)));
1067+
1068+
/*
1069+
* Create .done file forcibly to prevent the streamed segment from being
1070+
* archived later.
1071+
*/
1072+
if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
1073+
XLogArchiveForceDone(xlogfname);
1074+
else
1075+
XLogArchiveNotify(xlogfname);
1076+
1077+
recvFile = -1;
1078+
}
1079+
10551080
/*
10561081
* Send reply message to primary, indicating our current WAL locations, oldest
10571082
* xmin and the current time.

0 commit comments

Comments
 (0)