Skip to content

Commit f6cea45

Browse files
committed
Prevent WAL files created by pg_basebackup -x/X from being archived again.
WAL (and timeline history) files created by pg_basebackup did not maintain the new base backup's archive status. That's currently not a problem if the new node is used as a standby - but if that node is promoted all still existing files can get archived again. With a high wal_keep_segment settings that can happen a significant time later - which is quite confusing. Change both the backend (for the -x/-X fetch case) and pg_basebackup (for -X stream) itself to always mark WAL/timeline files included in the base backup as .done. That's in line with walreceiver.c doing so. The verbosity of the pg_basebackup changes show pretty clearly that it needs some refactoring, but that'd result in not be backpatchable changes. Backpatch to 9.1 where pg_basebackup was introduced. Discussion: 20141205002854.GE21964@awork2.anarazel.de
1 parent bb2e2ce commit f6cea45

File tree

5 files changed

+115
-21
lines changed

5 files changed

+115
-21
lines changed

src/backend/replication/basebackup.c

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
404404
errmsg("unexpected WAL file size \"%s\"", walFiles[i])));
405405
}
406406

407+
/* send the WAL file itself */
407408
_tarWriteHeader(pathbuf, NULL, &statbuf);
408409

409410
while ((cnt = fread(buf, 1, Min(sizeof(buf), XLogSegSize - len), fp)) > 0)
@@ -428,7 +429,17 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
428429
}
429430

430431
/* XLogSegSize is a multiple of 512, so no need for padding */
432+
431433
FreeFile(fp);
434+
435+
/*
436+
* Mark file as archived, otherwise files can get archived again
437+
* after promotion of a new node. This is in line with
438+
* walreceiver.c always doing a XLogArchiveForceDone() after a
439+
* complete segment.
440+
*/
441+
StatusFilePath(pathbuf, walFiles[i], ".done");
442+
sendFileWithContent(pathbuf, "");
432443
}
433444

434445
/*
@@ -452,6 +463,10 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
452463
errmsg("could not stat file \"%s\": %m", pathbuf)));
453464

454465
sendFile(pathbuf, pathbuf, &statbuf, false);
466+
467+
/* unconditionally mark file as archived */
468+
StatusFilePath(pathbuf, fname, ".done");
469+
sendFileWithContent(pathbuf, "");
455470
}
456471

457472
/* Send CopyDone message for the last tar file */
@@ -900,6 +915,15 @@ sendDir(char *path, int basepathlen, bool sizeonly, List *tablespaces)
900915
_tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf);
901916
}
902917
size += 512; /* Size of the header just added */
918+
919+
/*
920+
* Also send archive_status directory (by hackishly reusing
921+
* statbuf from above ...).
922+
*/
923+
if (!sizeonly)
924+
_tarWriteHeader("./pg_xlog/archive_status", NULL, &statbuf);
925+
size += 512; /* Size of the header just added */
926+
903927
continue; /* don't recurse into pg_xlog */
904928
}
905929

src/bin/pg_basebackup/pg_basebackup.c

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include <zlib.h>
2929
#endif
3030

31+
#include "common/string.h"
3132
#include "getopt_long.h"
3233

3334
#include "receivelog.h"
@@ -266,7 +267,7 @@ LogStreamerMain(logstreamer_param *param)
266267
if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
267268
param->sysidentifier, param->xlogdir,
268269
reached_end_position, standby_message_timeout,
269-
NULL))
270+
NULL, true))
270271

271272
/*
272273
* Any errors will already have been reported in the function process,
@@ -290,6 +291,7 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
290291
logstreamer_param *param;
291292
uint32 hi,
292293
lo;
294+
char statusdir[MAXPGPATH];
293295

294296
param = pg_malloc0(sizeof(logstreamer_param));
295297
param->timeline = timeline;
@@ -324,13 +326,23 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
324326
/* Error message already written in GetConnection() */
325327
exit(1);
326328

329+
snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/pg_xlog", basedir);
330+
327331
/*
328-
* Always in plain format, so we can write to basedir/pg_xlog. But the
329-
* directory entry in the tar file may arrive later, so make sure it's
330-
* created before we start.
332+
* Create pg_xlog/archive_status (and thus pg_xlog) so we can can write to
333+
* basedir/pg_xlog as the directory entry in the tar file may arrive
334+
* later.
331335
*/
332-
snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/pg_xlog", basedir);
333-
verify_dir_is_empty_or_create(param->xlogdir);
336+
snprintf(statusdir, sizeof(statusdir), "%s/pg_xlog/archive_status",
337+
basedir);
338+
339+
if (pg_mkdir_p(statusdir, S_IRWXU) != 0 && errno != EEXIST)
340+
{
341+
fprintf(stderr,
342+
_("%s: could not create directory \"%s\": %s\n"),
343+
progname, statusdir, strerror(errno));
344+
disconnect_and_exit(1);
345+
}
334346

335347
/*
336348
* Start a child process and tell it to start streaming. On Unix, this is
@@ -1003,10 +1015,12 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
10031015
{
10041016
/*
10051017
* When streaming WAL, pg_xlog will have been created
1006-
* by the wal receiver process, so just ignore failure
1007-
* on that.
1018+
* by the wal receiver process. So just ignore creation
1019+
* failures on related directories.
10081020
*/
1009-
if (!streamwal || strcmp(filename + strlen(filename) - 8, "/pg_xlog") != 0)
1021+
if (!((pg_str_endswith(filename, "/pg_xlog") ||
1022+
pg_str_endswith(filename, "/archive_status")) &&
1023+
errno == EEXIST))
10101024
{
10111025
fprintf(stderr,
10121026
_("%s: could not create directory \"%s\": %s\n"),

src/bin/pg_basebackup/pg_receivexlog.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,8 @@ StreamLog(void)
335335
starttli);
336336

337337
ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
338-
stop_streaming, standby_message_timeout, ".partial");
338+
stop_streaming, standby_message_timeout, ".partial",
339+
false);
339340

340341
PQfinish(conn);
341342
}

src/bin/pg_basebackup/receivelog.c

Lines changed: 64 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,41 @@ static char current_walfile_name[MAXPGPATH] = "";
3535
static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
3636
uint32 timeline, char *basedir,
3737
stream_stop_callback stream_stop, int standby_message_timeout,
38-
char *partial_suffix, XLogRecPtr *stoppos);
38+
char *partial_suffix, XLogRecPtr *stoppos,
39+
bool mark_done);
3940

4041
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
4142
uint32 *timeline);
4243

44+
static bool
45+
mark_file_as_archived(const char *basedir, const char *fname)
46+
{
47+
int fd;
48+
static char tmppath[MAXPGPATH];
49+
50+
snprintf(tmppath, sizeof(tmppath), "%s/archive_status/%s.done",
51+
basedir, fname);
52+
53+
fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
54+
if (fd < 0)
55+
{
56+
fprintf(stderr, _("%s: could not create archive status file \"%s\": %s\n"),
57+
progname, tmppath, strerror(errno));
58+
return false;
59+
}
60+
61+
if (fsync(fd) != 0)
62+
{
63+
fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
64+
progname, tmppath, strerror(errno));
65+
return false;
66+
}
67+
68+
close(fd);
69+
70+
return true;
71+
}
72+
4373
/*
4474
* Open a new WAL file in the specified directory.
4575
*
@@ -133,7 +163,7 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
133163
* and returns false, otherwise returns true.
134164
*/
135165
static bool
136-
close_walfile(char *basedir, char *partial_suffix)
166+
close_walfile(char *basedir, char *partial_suffix, bool mark_done)
137167
{
138168
off_t currpos;
139169

@@ -187,6 +217,19 @@ close_walfile(char *basedir, char *partial_suffix)
187217
_("%s: not renaming \"%s%s\", segment is not complete\n"),
188218
progname, current_walfile_name, partial_suffix);
189219

220+
/*
221+
* Mark file as archived if requested by the caller - pg_basebackup needs
222+
* to do so as files can otherwise get archived again after promotion of a
223+
* new node. This is in line with walreceiver.c always doing a
224+
* XLogArchiveForceDone() after a complete segment.
225+
*/
226+
if (currpos == XLOG_SEG_SIZE && mark_done)
227+
{
228+
/* writes error message if failed */
229+
if (!mark_file_as_archived(basedir, current_walfile_name))
230+
return false;
231+
}
232+
190233
return true;
191234
}
192235

@@ -285,7 +328,8 @@ existsTimeLineHistoryFile(char *basedir, TimeLineID tli)
285328
}
286329

287330
static bool
288-
writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *content)
331+
writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename,
332+
char *content, bool mark_done)
289333
{
290334
int size = strlen(content);
291335
char path[MAXPGPATH];
@@ -364,6 +408,14 @@ writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *co
364408
return false;
365409
}
366410

411+
/* Maintain archive_status, check close_walfile() for details. */
412+
if (mark_done)
413+
{
414+
/* writes error message if failed */
415+
if (!mark_file_as_archived(basedir, histfname))
416+
return false;
417+
}
418+
367419
return true;
368420
}
369421

@@ -508,7 +560,8 @@ bool
508560
ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
509561
char *sysidentifier, char *basedir,
510562
stream_stop_callback stream_stop,
511-
int standby_message_timeout, char *partial_suffix)
563+
int standby_message_timeout, char *partial_suffix,
564+
bool mark_done)
512565
{
513566
char query[128];
514567
PGresult *res;
@@ -593,7 +646,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
593646
/* Write the history file to disk */
594647
writeTimeLineHistoryFile(basedir, timeline,
595648
PQgetvalue(res, 0, 0),
596-
PQgetvalue(res, 0, 1));
649+
PQgetvalue(res, 0, 1),
650+
mark_done);
597651

598652
PQclear(res);
599653
}
@@ -622,7 +676,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
622676
/* Stream the WAL */
623677
res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
624678
standby_message_timeout, partial_suffix,
625-
&stoppos);
679+
&stoppos, mark_done);
626680
if (res == NULL)
627681
goto error;
628682

@@ -787,7 +841,7 @@ static PGresult *
787841
HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
788842
char *basedir, stream_stop_callback stream_stop,
789843
int standby_message_timeout, char *partial_suffix,
790-
XLogRecPtr *stoppos)
844+
XLogRecPtr *stoppos, bool mark_done)
791845
{
792846
char *copybuf = NULL;
793847
int64 last_status = -1;
@@ -814,7 +868,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
814868
*/
815869
if (still_sending && stream_stop(blockpos, timeline, false))
816870
{
817-
if (!close_walfile(basedir, partial_suffix))
871+
if (!close_walfile(basedir, partial_suffix, mark_done))
818872
{
819873
/* Potential error message is written by close_walfile */
820874
goto error;
@@ -913,7 +967,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
913967
*/
914968
if (still_sending)
915969
{
916-
if (!close_walfile(basedir, partial_suffix))
970+
if (!close_walfile(basedir, partial_suffix, mark_done))
917971
{
918972
/* Error message written in close_walfile() */
919973
PQclear(res);
@@ -1081,7 +1135,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
10811135
/* Did we reach the end of a WAL segment? */
10821136
if (blockpos % XLOG_SEG_SIZE == 0)
10831137
{
1084-
if (!close_walfile(basedir, partial_suffix))
1138+
if (!close_walfile(basedir, partial_suffix, mark_done))
10851139
/* Error message written in close_walfile() */
10861140
goto error;
10871141

src/bin/pg_basebackup/receivelog.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,5 @@ extern bool ReceiveXlogStream(PGconn *conn,
1414
char *basedir,
1515
stream_stop_callback stream_stop,
1616
int standby_message_timeout,
17-
char *partial_suffix);
17+
char *partial_suffix,
18+
bool mark_done);

0 commit comments

Comments
 (0)