@@ -35,11 +35,41 @@ static char current_walfile_name[MAXPGPATH] = "";
35
35
static PGresult * HandleCopyStream (PGconn * conn , XLogRecPtr startpos ,
36
36
uint32 timeline , char * basedir ,
37
37
stream_stop_callback stream_stop , int standby_message_timeout ,
38
- char * partial_suffix , XLogRecPtr * stoppos );
38
+ char * partial_suffix , XLogRecPtr * stoppos ,
39
+ bool mark_done );
39
40
40
41
static bool ReadEndOfStreamingResult (PGresult * res , XLogRecPtr * startpos ,
41
42
uint32 * timeline );
42
43
44
+ static bool
45
+ mark_file_as_archived (const char * basedir , const char * fname )
46
+ {
47
+ int fd ;
48
+ static char tmppath [MAXPGPATH ];
49
+
50
+ snprintf (tmppath , sizeof (tmppath ), "%s/archive_status/%s.done" ,
51
+ basedir , fname );
52
+
53
+ fd = open (tmppath , O_WRONLY | O_CREAT | PG_BINARY , S_IRUSR | S_IWUSR );
54
+ if (fd < 0 )
55
+ {
56
+ fprintf (stderr , _ ("%s: could not create archive status file \"%s\": %s\n" ),
57
+ progname , tmppath , strerror (errno ));
58
+ return false;
59
+ }
60
+
61
+ if (fsync (fd ) != 0 )
62
+ {
63
+ fprintf (stderr , _ ("%s: could not fsync file \"%s\": %s\n" ),
64
+ progname , tmppath , strerror (errno ));
65
+ return false;
66
+ }
67
+
68
+ close (fd );
69
+
70
+ return true;
71
+ }
72
+
43
73
/*
44
74
* Open a new WAL file in the specified directory.
45
75
*
@@ -133,7 +163,7 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
133
163
* and returns false, otherwise returns true.
134
164
*/
135
165
static bool
136
- close_walfile (char * basedir , char * partial_suffix )
166
+ close_walfile (char * basedir , char * partial_suffix , bool mark_done )
137
167
{
138
168
off_t currpos ;
139
169
@@ -187,6 +217,19 @@ close_walfile(char *basedir, char *partial_suffix)
187
217
_ ("%s: not renaming \"%s%s\", segment is not complete\n" ),
188
218
progname , current_walfile_name , partial_suffix );
189
219
220
+ /*
221
+ * Mark file as archived if requested by the caller - pg_basebackup needs
222
+ * to do so as files can otherwise get archived again after promotion of a
223
+ * new node. This is in line with walreceiver.c always doing a
224
+ * XLogArchiveForceDone() after a complete segment.
225
+ */
226
+ if (currpos == XLOG_SEG_SIZE && mark_done )
227
+ {
228
+ /* writes error message if failed */
229
+ if (!mark_file_as_archived (basedir , current_walfile_name ))
230
+ return false;
231
+ }
232
+
190
233
return true;
191
234
}
192
235
@@ -285,7 +328,8 @@ existsTimeLineHistoryFile(char *basedir, TimeLineID tli)
285
328
}
286
329
287
330
static bool
288
- writeTimeLineHistoryFile (char * basedir , TimeLineID tli , char * filename , char * content )
331
+ writeTimeLineHistoryFile (char * basedir , TimeLineID tli , char * filename ,
332
+ char * content , bool mark_done )
289
333
{
290
334
int size = strlen (content );
291
335
char path [MAXPGPATH ];
@@ -364,6 +408,14 @@ writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *co
364
408
return false;
365
409
}
366
410
411
+ /* Maintain archive_status, check close_walfile() for details. */
412
+ if (mark_done )
413
+ {
414
+ /* writes error message if failed */
415
+ if (!mark_file_as_archived (basedir , histfname ))
416
+ return false;
417
+ }
418
+
367
419
return true;
368
420
}
369
421
508
560
ReceiveXlogStream (PGconn * conn , XLogRecPtr startpos , uint32 timeline ,
509
561
char * sysidentifier , char * basedir ,
510
562
stream_stop_callback stream_stop ,
511
- int standby_message_timeout , char * partial_suffix )
563
+ int standby_message_timeout , char * partial_suffix ,
564
+ bool mark_done )
512
565
{
513
566
char query [128 ];
514
567
PGresult * res ;
@@ -593,7 +646,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
593
646
/* Write the history file to disk */
594
647
writeTimeLineHistoryFile (basedir , timeline ,
595
648
PQgetvalue (res , 0 , 0 ),
596
- PQgetvalue (res , 0 , 1 ));
649
+ PQgetvalue (res , 0 , 1 ),
650
+ mark_done );
597
651
598
652
PQclear (res );
599
653
}
@@ -622,7 +676,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
622
676
/* Stream the WAL */
623
677
res = HandleCopyStream (conn , startpos , timeline , basedir , stream_stop ,
624
678
standby_message_timeout , partial_suffix ,
625
- & stoppos );
679
+ & stoppos , mark_done );
626
680
if (res == NULL )
627
681
goto error ;
628
682
@@ -787,7 +841,7 @@ static PGresult *
787
841
HandleCopyStream (PGconn * conn , XLogRecPtr startpos , uint32 timeline ,
788
842
char * basedir , stream_stop_callback stream_stop ,
789
843
int standby_message_timeout , char * partial_suffix ,
790
- XLogRecPtr * stoppos )
844
+ XLogRecPtr * stoppos , bool mark_done )
791
845
{
792
846
char * copybuf = NULL ;
793
847
int64 last_status = -1 ;
@@ -814,7 +868,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
814
868
*/
815
869
if (still_sending && stream_stop (blockpos , timeline , false))
816
870
{
817
- if (!close_walfile (basedir , partial_suffix ))
871
+ if (!close_walfile (basedir , partial_suffix , mark_done ))
818
872
{
819
873
/* Potential error message is written by close_walfile */
820
874
goto error ;
@@ -913,7 +967,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
913
967
*/
914
968
if (still_sending )
915
969
{
916
- if (!close_walfile (basedir , partial_suffix ))
970
+ if (!close_walfile (basedir , partial_suffix , mark_done ))
917
971
{
918
972
/* Error message written in close_walfile() */
919
973
PQclear (res );
@@ -1081,7 +1135,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
1081
1135
/* Did we reach the end of a WAL segment? */
1082
1136
if (blockpos % XLOG_SEG_SIZE == 0 )
1083
1137
{
1084
- if (!close_walfile (basedir , partial_suffix ))
1138
+ if (!close_walfile (basedir , partial_suffix , mark_done ))
1085
1139
/* Error message written in close_walfile() */
1086
1140
goto error ;
1087
1141
0 commit comments