Skip to content

Commit 90e4a2b

Browse files
committed
Prevent WAL files created by pg_basebackup -x/X from being archived again.
WAL (and timeline history) files created by pg_basebackup did not maintain the new base backup's archive status. That's currently not a problem if the new node is used as a standby - but if that node is promoted all still existing files can get archived again. With a high wal_keep_segment settings that can happen a significant time later - which is quite confusing. Change both the backend (for the -x/-X fetch case) and pg_basebackup (for -X stream) itself to always mark WAL/timeline files included in the base backup as .done. That's in line with walreceiver.c doing so. The verbosity of the pg_basebackup changes show pretty clearly that it needs some refactoring, but that'd result in not be backpatchable changes. Backpatch to 9.1 where pg_basebackup was introduced. Discussion: 20141205002854.GE21964@awork2.anarazel.de
1 parent 70e36ad commit 90e4a2b

File tree

5 files changed

+115
-22
lines changed

5 files changed

+115
-22
lines changed

src/backend/replication/basebackup.c

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -471,6 +471,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
471471
errmsg("unexpected WAL file size \"%s\"", walFiles[i])));
472472
}
473473

474+
/* send the WAL file itself */
474475
_tarWriteHeader(pathbuf, NULL, &statbuf);
475476

476477
while ((cnt = fread(buf, 1, Min(sizeof(buf), XLogSegSize - len), fp)) > 0)
@@ -497,7 +498,17 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
497498
}
498499

499500
/* XLogSegSize is a multiple of 512, so no need for padding */
501+
500502
FreeFile(fp);
503+
504+
/*
505+
* Mark file as archived, otherwise files can get archived again
506+
* after promotion of a new node. This is in line with
507+
* walreceiver.c always doing a XLogArchiveForceDone() after a
508+
* complete segment.
509+
*/
510+
StatusFilePath(pathbuf, walFiles[i], ".done");
511+
sendFileWithContent(pathbuf, "");
501512
}
502513

503514
/*
@@ -521,6 +532,10 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
521532
errmsg("could not stat file \"%s\": %m", pathbuf)));
522533

523534
sendFile(pathbuf, pathbuf, &statbuf, false);
535+
536+
/* unconditionally mark file as archived */
537+
StatusFilePath(pathbuf, fname, ".done");
538+
sendFileWithContent(pathbuf, "");
524539
}
525540

526541
/* Send CopyDone message for the last tar file */
@@ -1021,6 +1036,15 @@ sendDir(char *path, int basepathlen, bool sizeonly, List *tablespaces)
10211036
_tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf);
10221037
}
10231038
size += 512; /* Size of the header just added */
1039+
1040+
/*
1041+
* Also send archive_status directory (by hackishly reusing
1042+
* statbuf from above ...).
1043+
*/
1044+
if (!sizeonly)
1045+
_tarWriteHeader("./pg_xlog/archive_status", NULL, &statbuf);
1046+
size += 512; /* Size of the header just added */
1047+
10241048
continue; /* don't recurse into pg_xlog */
10251049
}
10261050

src/bin/pg_basebackup/pg_basebackup.c

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include <zlib.h>
2626
#endif
2727

28+
#include "common/string.h"
2829
#include "getopt_long.h"
2930
#include "libpq-fe.h"
3031
#include "pqexpbuffer.h"
@@ -370,7 +371,7 @@ LogStreamerMain(logstreamer_param *param)
370371
if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
371372
param->sysidentifier, param->xlogdir,
372373
reached_end_position, standby_message_timeout,
373-
NULL))
374+
NULL, true))
374375

375376
/*
376377
* Any errors will already have been reported in the function process,
@@ -394,6 +395,7 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
394395
logstreamer_param *param;
395396
uint32 hi,
396397
lo;
398+
char statusdir[MAXPGPATH];
397399

398400
param = pg_malloc0(sizeof(logstreamer_param));
399401
param->timeline = timeline;
@@ -428,13 +430,23 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
428430
/* Error message already written in GetConnection() */
429431
exit(1);
430432

433+
snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/pg_xlog", basedir);
434+
431435
/*
432-
* Always in plain format, so we can write to basedir/pg_xlog. But the
433-
* directory entry in the tar file may arrive later, so make sure it's
434-
* created before we start.
436+
* Create pg_xlog/archive_status (and thus pg_xlog) so we can can write to
437+
* basedir/pg_xlog as the directory entry in the tar file may arrive
438+
* later.
435439
*/
436-
snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/pg_xlog", basedir);
437-
verify_dir_is_empty_or_create(param->xlogdir);
440+
snprintf(statusdir, sizeof(statusdir), "%s/pg_xlog/archive_status",
441+
basedir);
442+
443+
if (pg_mkdir_p(statusdir, S_IRWXU) != 0 && errno != EEXIST)
444+
{
445+
fprintf(stderr,
446+
_("%s: could not create directory \"%s\": %s\n"),
447+
progname, statusdir, strerror(errno));
448+
disconnect_and_exit(1);
449+
}
438450

439451
/*
440452
* Start a child process and tell it to start streaming. On Unix, this is
@@ -1236,11 +1248,12 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
12361248
* by the wal receiver process. Also, when transaction
12371249
* log directory location was specified, pg_xlog has
12381250
* already been created as a symbolic link before
1239-
* starting the actual backup. So just ignore failure
1240-
* on them.
1251+
* starting the actual backup. So just ignore creation
1252+
* failures on related directories.
12411253
*/
1242-
if ((!streamwal && (strcmp(xlog_dir, "") == 0))
1243-
|| strcmp(filename + strlen(filename) - 8, "/pg_xlog") != 0)
1254+
if (!((pg_str_endswith(filename, "/pg_xlog") ||
1255+
pg_str_endswith(filename, "/archive_status")) &&
1256+
errno == EEXIST))
12441257
{
12451258
fprintf(stderr,
12461259
_("%s: could not create directory \"%s\": %s\n"),

src/bin/pg_basebackup/pg_receivexlog.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,8 @@ StreamLog(void)
330330
starttli);
331331

332332
ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
333-
stop_streaming, standby_message_timeout, ".partial");
333+
stop_streaming, standby_message_timeout, ".partial",
334+
false);
334335

335336
PQfinish(conn);
336337
}

src/bin/pg_basebackup/receivelog.c

Lines changed: 64 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,41 @@ static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
3434
static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
3535
uint32 timeline, char *basedir,
3636
stream_stop_callback stream_stop, int standby_message_timeout,
37-
char *partial_suffix, XLogRecPtr *stoppos);
37+
char *partial_suffix, XLogRecPtr *stoppos,
38+
bool mark_done);
3839

3940
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
4041
uint32 *timeline);
4142

43+
static bool
44+
mark_file_as_archived(const char *basedir, const char *fname)
45+
{
46+
int fd;
47+
static char tmppath[MAXPGPATH];
48+
49+
snprintf(tmppath, sizeof(tmppath), "%s/archive_status/%s.done",
50+
basedir, fname);
51+
52+
fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
53+
if (fd < 0)
54+
{
55+
fprintf(stderr, _("%s: could not create archive status file \"%s\": %s\n"),
56+
progname, tmppath, strerror(errno));
57+
return false;
58+
}
59+
60+
if (fsync(fd) != 0)
61+
{
62+
fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
63+
progname, tmppath, strerror(errno));
64+
return false;
65+
}
66+
67+
close(fd);
68+
69+
return true;
70+
}
71+
4272
/*
4373
* Open a new WAL file in the specified directory.
4474
*
@@ -132,7 +162,7 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
132162
* and returns false, otherwise returns true.
133163
*/
134164
static bool
135-
close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
165+
close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_done)
136166
{
137167
off_t currpos;
138168

@@ -186,6 +216,19 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
186216
_("%s: not renaming \"%s%s\", segment is not complete\n"),
187217
progname, current_walfile_name, partial_suffix);
188218

219+
/*
220+
* Mark file as archived if requested by the caller - pg_basebackup needs
221+
* to do so as files can otherwise get archived again after promotion of a
222+
* new node. This is in line with walreceiver.c always doing a
223+
* XLogArchiveForceDone() after a complete segment.
224+
*/
225+
if (currpos == XLOG_SEG_SIZE && mark_done)
226+
{
227+
/* writes error message if failed */
228+
if (!mark_file_as_archived(basedir, current_walfile_name))
229+
return false;
230+
}
231+
189232
lastFlushPosition = pos;
190233
return true;
191234
}
@@ -228,7 +271,8 @@ existsTimeLineHistoryFile(char *basedir, TimeLineID tli)
228271
}
229272

230273
static bool
231-
writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *content)
274+
writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename,
275+
char *content, bool mark_done)
232276
{
233277
int size = strlen(content);
234278
char path[MAXPGPATH];
@@ -307,6 +351,14 @@ writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *co
307351
return false;
308352
}
309353

354+
/* Maintain archive_status, check close_walfile() for details. */
355+
if (mark_done)
356+
{
357+
/* writes error message if failed */
358+
if (!mark_file_as_archived(basedir, histfname))
359+
return false;
360+
}
361+
310362
return true;
311363
}
312364

@@ -423,7 +475,8 @@ bool
423475
ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
424476
char *sysidentifier, char *basedir,
425477
stream_stop_callback stream_stop,
426-
int standby_message_timeout, char *partial_suffix)
478+
int standby_message_timeout, char *partial_suffix,
479+
bool mark_done)
427480
{
428481
char query[128];
429482
char slotcmd[128];
@@ -538,7 +591,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
538591
/* Write the history file to disk */
539592
writeTimeLineHistoryFile(basedir, timeline,
540593
PQgetvalue(res, 0, 0),
541-
PQgetvalue(res, 0, 1));
594+
PQgetvalue(res, 0, 1),
595+
mark_done);
542596

543597
PQclear(res);
544598
}
@@ -568,7 +622,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
568622
/* Stream the WAL */
569623
res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
570624
standby_message_timeout, partial_suffix,
571-
&stoppos);
625+
&stoppos, mark_done);
572626
if (res == NULL)
573627
goto error;
574628

@@ -733,7 +787,7 @@ static PGresult *
733787
HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
734788
char *basedir, stream_stop_callback stream_stop,
735789
int standby_message_timeout, char *partial_suffix,
736-
XLogRecPtr *stoppos)
790+
XLogRecPtr *stoppos, bool mark_done)
737791
{
738792
char *copybuf = NULL;
739793
int64 last_status = -1;
@@ -760,7 +814,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
760814
*/
761815
if (still_sending && stream_stop(blockpos, timeline, false))
762816
{
763-
if (!close_walfile(basedir, partial_suffix, blockpos))
817+
if (!close_walfile(basedir, partial_suffix, blockpos, mark_done))
764818
{
765819
/* Potential error message is written by close_walfile */
766820
goto error;
@@ -859,7 +913,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
859913
*/
860914
if (still_sending)
861915
{
862-
if (!close_walfile(basedir, partial_suffix, blockpos))
916+
if (!close_walfile(basedir, partial_suffix, blockpos, mark_done))
863917
{
864918
/* Error message written in close_walfile() */
865919
PQclear(res);
@@ -1046,7 +1100,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
10461100
/* Did we reach the end of a WAL segment? */
10471101
if (blockpos % XLOG_SEG_SIZE == 0)
10481102
{
1049-
if (!close_walfile(basedir, partial_suffix, blockpos))
1103+
if (!close_walfile(basedir, partial_suffix, blockpos, mark_done))
10501104
/* Error message written in close_walfile() */
10511105
goto error;
10521106

src/bin/pg_basebackup/receivelog.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,5 @@ extern bool ReceiveXlogStream(PGconn *conn,
1616
char *basedir,
1717
stream_stop_callback stream_stop,
1818
int standby_message_timeout,
19-
char *partial_suffix);
19+
char *partial_suffix,
20+
bool mark_done);

0 commit comments

Comments
 (0)