28
28
29
29
static int standby_message_timeout = 10 * 1000 ; /* 10 sec = default */
30
30
static XLogRecPtr stop_backup_lsn = InvalidXLogRecPtr ;
31
+
32
+ /*
33
+ * How long we should wait for streaming end in seconds.
34
+ * Retreived as checkpoint_timeout + checkpoint_timeout * 0.1
35
+ */
36
+ static uint32 stream_stop_timeout = 0 ;
37
+ /* Time in which we started to wait for streaming end */
38
+ static time_t stream_stop_begin = 0 ;
39
+
31
40
const char * progname = "pg_probackup" ;
32
41
33
42
/* list of files contained in backup */
@@ -65,6 +74,7 @@ static void do_backup_database(parray *backup_list);
65
74
static void pg_start_backup (const char * label , bool smooth , pgBackup * backup );
66
75
static void pg_switch_wal (void );
67
76
static void pg_stop_backup (pgBackup * backup );
77
+ static int checkpoint_timeout (void );
68
78
69
79
static void add_pgdata_files (parray * files , const char * root );
70
80
static void write_backup_file_list (parray * files , const char * root );
@@ -785,8 +795,12 @@ pg_ptrack_get_and_clear(Oid tablespace_oid, Oid db_oid, Oid rel_oid,
785
795
}
786
796
787
797
/*
788
- * Wait for target 'lsn' to be archived in archive 'wal' directory with
789
- * WAL segment file.
798
+ * Wait for target 'lsn'.
799
+ *
800
+ * If current backup started in archive mode wait for 'lsn' to be archived in
801
+ * archive 'wal' directory with WAL segment file.
802
+ * If current backup started in stream mode wait for 'lsn' to be streamed in
803
+ * 'pg_xlog' directory.
790
804
*/
791
805
static void
792
806
wait_wal_lsn (XLogRecPtr lsn )
@@ -807,45 +821,28 @@ wait_wal_lsn(XLogRecPtr lsn)
807
821
808
822
if (stream_wal )
809
823
{
810
- PGresult * res ;
811
- const char * val ;
812
- const char * hintmsg ;
813
-
814
824
pgBackupGetPath2 (& current , wal_dir , lengthof (wal_dir ),
815
825
DATABASE_DIR , PG_XLOG_DIR );
816
826
join_path_components (wal_segment_full_path , wal_dir , wal_segment );
817
827
818
- res = pgut_execute (backup_conn , "show checkpoint_timeout" , 0 , NULL );
819
- val = PQgetvalue (res , 0 , 0 );
820
- PQclear (res );
821
-
822
- if (!parse_int (val , (int * ) & timeout , OPTION_UNIT_S ,
823
- & hintmsg ))
824
- {
825
- if (hintmsg )
826
- elog (ERROR , "Invalid value of checkout_timeout %s: %s" , val ,
827
- hintmsg );
828
- else
829
- elog (ERROR , "Invalid value of checkout_timeout %s" , val );
830
- }
831
- /* Add 3 seconds to the initial value of checkpoint_timeout */
832
- timeout = timeout + 3 ;
828
+ timeout = (uint32 ) checkpoint_timeout ();
829
+ timeout = timeout + timeout * 0.1 ;
833
830
}
834
831
else
835
832
{
836
833
join_path_components (wal_segment_full_path , arclog_path , wal_segment );
837
834
timeout = archive_timeout ;
838
835
}
839
836
840
- /* Wait until target LSN is archived */
837
+ /* Wait until target LSN is archived or streamed */
841
838
while (true)
842
839
{
843
840
bool file_exists = fileExists (wal_segment_full_path );
844
841
845
842
if (file_exists )
846
843
{
847
844
/*
848
- * WAL segment was archived . Check LSN on it.
845
+ * A WAL segment found . Check LSN on it.
849
846
*/
850
847
if ((stream_wal && wal_contains_lsn (wal_dir , lsn , tli )) ||
851
848
(!stream_wal && wal_contains_lsn (arclog_path , lsn , tli )))
@@ -996,11 +993,12 @@ pg_stop_backup(pgBackup *backup)
996
993
PQclear (res );
997
994
998
995
if (stream_wal )
999
- {
1000
996
/* Wait for the completion of stream */
1001
- elog (INFO , "Wait end of WAL streaming" );
1002
997
pthread_join (stream_thread , NULL );
1003
- }
998
+ /*
999
+ * Wait for stop_lsn to be archived or streamed.
1000
+ * We wait for stop_lsn in stream mode just in case.
1001
+ */
1004
1002
wait_wal_lsn (stop_backup_lsn );
1005
1003
1006
1004
/* Fill in fields if that is the correct end of backup. */
@@ -1032,6 +1030,33 @@ pg_stop_backup(pgBackup *backup)
1032
1030
}
1033
1031
}
1034
1032
1033
+ /*
1034
+ * Retreive checkpoint_timeout GUC value in seconds.
1035
+ */
1036
+ static int
1037
+ checkpoint_timeout (void )
1038
+ {
1039
+ PGresult * res ;
1040
+ const char * val ;
1041
+ const char * hintmsg ;
1042
+ int val_int ;
1043
+
1044
+ res = pgut_execute (backup_conn , "show checkpoint_timeout" , 0 , NULL );
1045
+ val = PQgetvalue (res , 0 , 0 );
1046
+ PQclear (res );
1047
+
1048
+ if (!parse_int (val , & val_int , OPTION_UNIT_S , & hintmsg ))
1049
+ {
1050
+ if (hintmsg )
1051
+ elog (ERROR , "Invalid value of checkout_timeout %s: %s" , val ,
1052
+ hintmsg );
1053
+ else
1054
+ elog (ERROR , "Invalid value of checkout_timeout %s" , val );
1055
+ }
1056
+
1057
+ return val_int ;
1058
+ }
1059
+
1035
1060
/*
1036
1061
* Return true if the path is a existing regular file.
1037
1062
*/
@@ -1647,8 +1672,28 @@ stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
1647
1672
elog (LOG , _ ("switched to timeline %u at %X/%X\n" ),
1648
1673
timeline , (uint32 ) (prevpos >> 32 ), (uint32 ) prevpos );
1649
1674
1650
- if (stop_backup_lsn != InvalidXLogRecPtr && xlogpos > stop_backup_lsn )
1651
- return true;
1675
+ if (!XLogRecPtrIsInvalid (stop_backup_lsn ))
1676
+ {
1677
+ if (xlogpos > stop_backup_lsn )
1678
+ return true;
1679
+
1680
+ /* pg_stop_backup() was executed, wait for the completion of stream */
1681
+ if (stream_stop_timeout == 0 )
1682
+ {
1683
+ elog (INFO , "Wait for LSN %X/%X to be streamed" ,
1684
+ (uint32 ) (stop_backup_lsn >> 32 ), (uint32 ) stop_backup_lsn );
1685
+
1686
+ stream_stop_timeout = checkpoint_timeout ();
1687
+ stream_stop_timeout = stream_stop_timeout + stream_stop_timeout * 0.1 ;
1688
+
1689
+ stream_stop_begin = time (NULL );
1690
+ }
1691
+
1692
+ if (time (NULL ) - stream_stop_begin > stream_stop_timeout )
1693
+ elog (ERROR , "Target LSN %X/%X could not be streamed in %d seconds" ,
1694
+ (uint32 ) (stop_backup_lsn >> 32 ), (uint32 ) stop_backup_lsn ,
1695
+ stream_stop_timeout );
1696
+ }
1652
1697
1653
1698
prevtimeline = timeline ;
1654
1699
prevpos = xlogpos ;
@@ -1709,6 +1754,10 @@ StreamLog(void *arg)
1709
1754
*/
1710
1755
startpos -= startpos % XLOG_SEG_SIZE ;
1711
1756
1757
+ /* Initialize timeout */
1758
+ stream_stop_timeout = 0 ;
1759
+ stream_stop_begin = 0 ;
1760
+
1712
1761
/*
1713
1762
* Start the replication
1714
1763
*/
0 commit comments