@@ -34,11 +34,41 @@ static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
34
34
static PGresult * HandleCopyStream (PGconn * conn , XLogRecPtr startpos ,
35
35
uint32 timeline , char * basedir ,
36
36
stream_stop_callback stream_stop , int standby_message_timeout ,
37
- char * partial_suffix , XLogRecPtr * stoppos );
37
+ char * partial_suffix , XLogRecPtr * stoppos ,
38
+ bool mark_done );
38
39
39
40
static bool ReadEndOfStreamingResult (PGresult * res , XLogRecPtr * startpos ,
40
41
uint32 * timeline );
41
42
43
+ static bool
44
+ mark_file_as_archived (const char * basedir , const char * fname )
45
+ {
46
+ int fd ;
47
+ static char tmppath [MAXPGPATH ];
48
+
49
+ snprintf (tmppath , sizeof (tmppath ), "%s/archive_status/%s.done" ,
50
+ basedir , fname );
51
+
52
+ fd = open (tmppath , O_WRONLY | O_CREAT | PG_BINARY , S_IRUSR | S_IWUSR );
53
+ if (fd < 0 )
54
+ {
55
+ fprintf (stderr , _ ("%s: could not create archive status file \"%s\": %s\n" ),
56
+ progname , tmppath , strerror (errno ));
57
+ return false;
58
+ }
59
+
60
+ if (fsync (fd ) != 0 )
61
+ {
62
+ fprintf (stderr , _ ("%s: could not fsync file \"%s\": %s\n" ),
63
+ progname , tmppath , strerror (errno ));
64
+ return false;
65
+ }
66
+
67
+ close (fd );
68
+
69
+ return true;
70
+ }
71
+
42
72
/*
43
73
* Open a new WAL file in the specified directory.
44
74
*
@@ -132,7 +162,7 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
132
162
* and returns false, otherwise returns true.
133
163
*/
134
164
static bool
135
- close_walfile (char * basedir , char * partial_suffix , XLogRecPtr pos )
165
+ close_walfile (char * basedir , char * partial_suffix , XLogRecPtr pos , bool mark_done )
136
166
{
137
167
off_t currpos ;
138
168
@@ -186,6 +216,19 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
186
216
_ ("%s: not renaming \"%s%s\", segment is not complete\n" ),
187
217
progname , current_walfile_name , partial_suffix );
188
218
219
+ /*
220
+ * Mark file as archived if requested by the caller - pg_basebackup needs
221
+ * to do so as files can otherwise get archived again after promotion of a
222
+ * new node. This is in line with walreceiver.c always doing a
223
+ * XLogArchiveForceDone() after a complete segment.
224
+ */
225
+ if (currpos == XLOG_SEG_SIZE && mark_done )
226
+ {
227
+ /* writes error message if failed */
228
+ if (!mark_file_as_archived (basedir , current_walfile_name ))
229
+ return false;
230
+ }
231
+
189
232
lastFlushPosition = pos ;
190
233
return true;
191
234
}
@@ -228,7 +271,8 @@ existsTimeLineHistoryFile(char *basedir, TimeLineID tli)
228
271
}
229
272
230
273
static bool
231
- writeTimeLineHistoryFile (char * basedir , TimeLineID tli , char * filename , char * content )
274
+ writeTimeLineHistoryFile (char * basedir , TimeLineID tli , char * filename ,
275
+ char * content , bool mark_done )
232
276
{
233
277
int size = strlen (content );
234
278
char path [MAXPGPATH ];
@@ -307,6 +351,14 @@ writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *co
307
351
return false;
308
352
}
309
353
354
+ /* Maintain archive_status, check close_walfile() for details. */
355
+ if (mark_done )
356
+ {
357
+ /* writes error message if failed */
358
+ if (!mark_file_as_archived (basedir , histfname ))
359
+ return false;
360
+ }
361
+
310
362
return true;
311
363
}
312
364
423
475
ReceiveXlogStream (PGconn * conn , XLogRecPtr startpos , uint32 timeline ,
424
476
char * sysidentifier , char * basedir ,
425
477
stream_stop_callback stream_stop ,
426
- int standby_message_timeout , char * partial_suffix )
478
+ int standby_message_timeout , char * partial_suffix ,
479
+ bool mark_done )
427
480
{
428
481
char query [128 ];
429
482
char slotcmd [128 ];
@@ -538,7 +591,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
538
591
/* Write the history file to disk */
539
592
writeTimeLineHistoryFile (basedir , timeline ,
540
593
PQgetvalue (res , 0 , 0 ),
541
- PQgetvalue (res , 0 , 1 ));
594
+ PQgetvalue (res , 0 , 1 ),
595
+ mark_done );
542
596
543
597
PQclear (res );
544
598
}
@@ -568,7 +622,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
568
622
/* Stream the WAL */
569
623
res = HandleCopyStream (conn , startpos , timeline , basedir , stream_stop ,
570
624
standby_message_timeout , partial_suffix ,
571
- & stoppos );
625
+ & stoppos , mark_done );
572
626
if (res == NULL )
573
627
goto error ;
574
628
@@ -733,7 +787,7 @@ static PGresult *
733
787
HandleCopyStream (PGconn * conn , XLogRecPtr startpos , uint32 timeline ,
734
788
char * basedir , stream_stop_callback stream_stop ,
735
789
int standby_message_timeout , char * partial_suffix ,
736
- XLogRecPtr * stoppos )
790
+ XLogRecPtr * stoppos , bool mark_done )
737
791
{
738
792
char * copybuf = NULL ;
739
793
int64 last_status = -1 ;
@@ -760,7 +814,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
760
814
*/
761
815
if (still_sending && stream_stop (blockpos , timeline , false))
762
816
{
763
- if (!close_walfile (basedir , partial_suffix , blockpos ))
817
+ if (!close_walfile (basedir , partial_suffix , blockpos , mark_done ))
764
818
{
765
819
/* Potential error message is written by close_walfile */
766
820
goto error ;
@@ -859,7 +913,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
859
913
*/
860
914
if (still_sending )
861
915
{
862
- if (!close_walfile (basedir , partial_suffix , blockpos ))
916
+ if (!close_walfile (basedir , partial_suffix , blockpos , mark_done ))
863
917
{
864
918
/* Error message written in close_walfile() */
865
919
PQclear (res );
@@ -1046,7 +1100,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
1046
1100
/* Did we reach the end of a WAL segment? */
1047
1101
if (blockpos % XLOG_SEG_SIZE == 0 )
1048
1102
{
1049
- if (!close_walfile (basedir , partial_suffix , blockpos ))
1103
+ if (!close_walfile (basedir , partial_suffix , blockpos , mark_done ))
1050
1104
/* Error message written in close_walfile() */
1051
1105
goto error ;
1052
1106
0 commit comments