Skip to content

Commit f961ad4

Browse files
committed
Prevent WAL files created by pg_basebackup -x/X from being archived again.
WAL (and timeline history) files created by pg_basebackup did not maintain the new base backup's archive status. That's currently not a problem if the new node is used as a standby - but if that node is promoted all still existing files can get archived again. With a high wal_keep_segment settings that can happen a significant time later - which is quite confusing. Change both the backend (for the -x/-X fetch case) and pg_basebackup (for -X stream) itself to always mark WAL/timeline files included in the base backup as .done. That's in line with walreceiver.c doing so. The verbosity of the pg_basebackup changes show pretty clearly that it needs some refactoring, but that'd result in not be backpatchable changes. Backpatch to 9.1 where pg_basebackup was introduced. Discussion: 20141205002854.GE21964@awork2.anarazel.de
1 parent 4967e07 commit f961ad4

File tree

5 files changed

+115
-16
lines changed

5 files changed

+115
-16
lines changed

src/backend/replication/basebackup.c

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
400400
errmsg("unexpected WAL file size \"%s\"", walFiles[i])));
401401
}
402402

403+
/* send the WAL file itself */
403404
_tarWriteHeader(pathbuf, NULL, &statbuf);
404405

405406
while ((cnt = fread(buf, 1, Min(sizeof(buf), XLogSegSize - len), fp)) > 0)
@@ -424,7 +425,17 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
424425
}
425426

426427
/* XLogSegSize is a multiple of 512, so no need for padding */
428+
427429
FreeFile(fp);
430+
431+
/*
432+
* Mark file as archived, otherwise files can get archived again
433+
* after promotion of a new node. This is in line with
434+
* walreceiver.c always doing a XLogArchiveForceDone() after a
435+
* complete segment.
436+
*/
437+
StatusFilePath(pathbuf, walFiles[i], ".done");
438+
sendFileWithContent(pathbuf, "");
428439
}
429440

430441
/*
@@ -447,6 +458,10 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
447458
errmsg("could not stat file \"%s\": %m", pathbuf)));
448459

449460
sendFile(pathbuf, pathbuf, &statbuf, false);
461+
462+
/* unconditionally mark file as archived */
463+
StatusFilePath(pathbuf, fname, ".done");
464+
sendFileWithContent(pathbuf, "");
450465
}
451466

452467
/* Send CopyDone message for the last tar file */
@@ -881,6 +896,15 @@ sendDir(char *path, int basepathlen, bool sizeonly, List *tablespaces)
881896
_tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf);
882897
}
883898
size += 512; /* Size of the header just added */
899+
900+
/*
901+
* Also send archive_status directory (by hackishly reusing
902+
* statbuf from above ...).
903+
*/
904+
if (!sizeonly)
905+
_tarWriteHeader("./pg_xlog/archive_status", NULL, &statbuf);
906+
size += 512; /* Size of the header just added */
907+
884908
continue; /* don't recurse into pg_xlog */
885909
}
886910

src/bin/pg_basebackup/pg_basebackup.c

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ LogStreamerMain(logstreamer_param *param)
259259
if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
260260
param->sysidentifier, param->xlogdir,
261261
reached_end_position, standby_message_timeout,
262-
true))
262+
true, true))
263263

264264
/*
265265
* Any errors will already have been reported in the function process,
@@ -281,6 +281,7 @@ static void
281281
StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
282282
{
283283
logstreamer_param *param;
284+
char statusdir[MAXPGPATH];
284285

285286
param = xmalloc0(sizeof(logstreamer_param));
286287
param->timeline = timeline;
@@ -314,13 +315,23 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
314315
/* Error message already written in GetConnection() */
315316
exit(1);
316317

318+
snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/pg_xlog", basedir);
319+
317320
/*
318-
* Always in plain format, so we can write to basedir/pg_xlog. But the
319-
* directory entry in the tar file may arrive later, so make sure it's
320-
* created before we start.
321+
* Create pg_xlog/archive_status (and thus pg_xlog) so we can can write to
322+
* basedir/pg_xlog as the directory entry in the tar file may arrive
323+
* later.
321324
*/
322-
snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/pg_xlog", basedir);
323-
verify_dir_is_empty_or_create(param->xlogdir);
325+
snprintf(statusdir, sizeof(statusdir), "%s/pg_xlog/archive_status",
326+
basedir);
327+
328+
if (pg_mkdir_p(statusdir, S_IRWXU) != 0 && errno != EEXIST)
329+
{
330+
fprintf(stderr,
331+
_("%s: could not create directory \"%s\": %s\n"),
332+
progname, statusdir, strerror(errno));
333+
disconnect_and_exit(1);
334+
}
324335

325336
/*
326337
* Start a child process and tell it to start streaming. On Unix, this is
@@ -403,6 +414,23 @@ verify_dir_is_empty_or_create(char *dirname)
403414
}
404415
}
405416

417+
/*
418+
* Returns whether the string `str' has the postfix `end'.
419+
*/
420+
static bool
421+
pg_str_endswith(const char *str, const char *end)
422+
{
423+
size_t slen = strlen(str);
424+
size_t elen = strlen(end);
425+
426+
/* can't be a postfix if longer */
427+
if (elen > slen)
428+
return false;
429+
430+
/* compare the end of the strings */
431+
str += slen - elen;
432+
return strcmp(str, end) == 0;
433+
}
406434

407435
/*
408436
* Print a progress report based on the global variables. If verbose output
@@ -835,10 +863,12 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
835863
{
836864
/*
837865
* When streaming WAL, pg_xlog will have been created
838-
* by the wal receiver process, so just ignore failure
839-
* on that.
866+
* by the wal receiver process. So just ignore creation
867+
* failures on related directories.
840868
*/
841-
if (!streamwal || strcmp(filename + strlen(filename) - 8, "/pg_xlog") != 0)
869+
if (!((pg_str_endswith(filename, "/pg_xlog") ||
870+
pg_str_endswith(filename, "/archive_status")) &&
871+
errno == EEXIST))
842872
{
843873
fprintf(stderr,
844874
_("%s: could not create directory \"%s\": %s\n"),

src/bin/pg_basebackup/pg_receivexlog.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ StreamLog(void)
321321
progname, startpos.xlogid, startpos.xrecoff, timeline);
322322

323323
ReceiveXlogStream(conn, startpos, timeline, NULL, basedir,
324-
stop_streaming, standby_message_timeout, false);
324+
stop_streaming, standby_message_timeout, false, true);
325325

326326
PQfinish(conn);
327327
}

src/bin/pg_basebackup/receivelog.c

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,35 @@ const XLogRecPtr InvalidXLogRecPtr = {0, 0};
4444
static int walfile = -1;
4545

4646

47+
static bool
48+
mark_file_as_archived(const char *basedir, const char *fname)
49+
{
50+
int fd;
51+
static char tmppath[MAXPGPATH];
52+
53+
snprintf(tmppath, sizeof(tmppath), "%s/archive_status/%s.done",
54+
basedir, fname);
55+
56+
fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
57+
if (fd < 0)
58+
{
59+
fprintf(stderr, _("%s: could not create archive status file \"%s\": %s\n"),
60+
progname, tmppath, strerror(errno));
61+
return false;
62+
}
63+
64+
if (fsync(fd) != 0)
65+
{
66+
fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
67+
progname, tmppath, strerror(errno));
68+
return false;
69+
}
70+
71+
close(fd);
72+
73+
return true;
74+
}
75+
4776
/*
4877
* Open a new WAL file in the specified directory. Store the name
4978
* (not including the full directory) in namebuf. Assumes there is
@@ -133,7 +162,8 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
133162
* completed writing the whole segment.
134163
*/
135164
static bool
136-
close_walfile(char *basedir, char *walname, bool segment_complete)
165+
close_walfile(char *basedir, char *walname, bool segment_complete,
166+
bool mark_done)
137167
{
138168
off_t currpos = lseek(walfile, 0, SEEK_CUR);
139169

@@ -184,6 +214,19 @@ close_walfile(char *basedir, char *walname, bool segment_complete)
184214
_("%s: not renaming \"%s\", segment is not complete\n"),
185215
progname, walname);
186216

217+
/*
218+
* Mark file as archived if requested by the caller - pg_basebackup needs
219+
* to do so as files can otherwise get archived again after promotion of a
220+
* new node. This is in line with walreceiver.c always doing a
221+
* XLogArchiveForceDone() after a complete segment.
222+
*/
223+
if (currpos == XLOG_SEG_SIZE && mark_done)
224+
{
225+
/* writes error message if failed */
226+
if (!mark_file_as_archived(basedir, walname))
227+
return false;
228+
}
229+
187230
return true;
188231
}
189232

@@ -284,7 +327,8 @@ bool
284327
ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
285328
char *sysidentifier, char *basedir,
286329
stream_stop_callback stream_stop,
287-
int standby_message_timeout, bool rename_partial)
330+
int standby_message_timeout, bool rename_partial,
331+
bool mark_done)
288332
{
289333
char query[128];
290334
char current_walfile_name[MAXPGPATH];
@@ -343,7 +387,6 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
343387
return false;
344388
}
345389
PQclear(res);
346-
347390
/*
348391
* Receive the actual xlog data
349392
*/
@@ -367,7 +410,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
367410
if (stream_stop && stream_stop(blockpos, timeline, false))
368411
{
369412
if (walfile != -1 && !close_walfile(basedir, current_walfile_name,
370-
rename_partial))
413+
rename_partial, mark_done))
371414
/* Potential error message is written by close_walfile */
372415
goto error;
373416
return true;
@@ -579,7 +622,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
579622
/* Did we reach the end of a WAL segment? */
580623
if (blockpos.xrecoff % XLOG_SEG_SIZE == 0)
581624
{
582-
if (!close_walfile(basedir, current_walfile_name, false))
625+
if (!close_walfile(basedir, current_walfile_name, false,
626+
mark_done))
583627
/* Error message written in close_walfile() */
584628
goto error;
585629

src/bin/pg_basebackup/receivelog.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,5 @@ extern bool ReceiveXlogStream(PGconn *conn,
1313
char *basedir,
1414
stream_stop_callback stream_stop,
1515
int standby_message_timeout,
16-
bool rename_partial);
16+
bool rename_partial,
17+
bool mark_done);

0 commit comments

Comments
 (0)