@@ -33,7 +33,11 @@ const char *progname = "pg_probackup";
33
33
/* list of files contained in backup */
34
34
static parray * backup_files_list = NULL ;
35
35
36
- static pthread_mutex_t check_stream_mut = PTHREAD_MUTEX_INITIALIZER ;
36
+ static pthread_mutex_t start_stream_mut = PTHREAD_MUTEX_INITIALIZER ;
37
+ /*
38
+ * We need to wait end of WAL streaming before execute pg_stop_backup().
39
+ */
40
+ static pthread_t stream_thread ;
37
41
38
42
static int is_ptrack_enable = false;
39
43
@@ -105,7 +109,6 @@ do_backup_database(parray *backup_list)
105
109
XLogRecPtr prev_backup_start_lsn = InvalidXLogRecPtr ;
106
110
107
111
pthread_t backup_threads [num_threads ];
108
- pthread_t stream_thread ;
109
112
backup_files_args * backup_threads_args [num_threads ];
110
113
111
114
pgBackup * prev_backup = NULL ;
@@ -142,24 +145,6 @@ do_backup_database(parray *backup_list)
142
145
strncat (label , " with pg_probackup" , lengthof (label ));
143
146
pg_start_backup (label , smooth_checkpoint , & current );
144
147
145
- pgBackupGetPath (& current , database_path , lengthof (database_path ),
146
- DATABASE_DIR );
147
-
148
- /* start stream replication */
149
- if (stream_wal )
150
- {
151
- join_path_components (dst_backup_path , database_path , PG_XLOG_DIR );
152
- dir_create_dir (dst_backup_path , DIR_PERMISSION );
153
-
154
- pthread_mutex_lock (& check_stream_mut );
155
- pthread_create (& stream_thread , NULL , (void * (* )(void * )) StreamLog , dst_backup_path );
156
- pthread_mutex_lock (& check_stream_mut );
157
- if (conn == NULL )
158
- elog (ERROR , "Cannot continue backup because stream connect has failed." );
159
-
160
- pthread_mutex_unlock (& check_stream_mut );
161
- }
162
-
163
148
/*
164
149
* If backup_label does not exist in $PGDATA, stop taking backup.
165
150
* NOTE. We can check it only on master, though.
@@ -178,6 +163,24 @@ do_backup_database(parray *backup_list)
178
163
}
179
164
}
180
165
166
+ pgBackupGetPath (& current , database_path , lengthof (database_path ),
167
+ DATABASE_DIR );
168
+
169
+ /* start stream replication */
170
+ if (stream_wal )
171
+ {
172
+ join_path_components (dst_backup_path , database_path , PG_XLOG_DIR );
173
+ dir_create_dir (dst_backup_path , DIR_PERMISSION );
174
+
175
+ pthread_mutex_lock (& start_stream_mut );
176
+ pthread_create (& stream_thread , NULL , (void * (* )(void * )) StreamLog , dst_backup_path );
177
+ pthread_mutex_lock (& start_stream_mut );
178
+ if (conn == NULL )
179
+ elog (ERROR , "Cannot continue backup because stream connect has failed." );
180
+
181
+ pthread_mutex_unlock (& start_stream_mut );
182
+ }
183
+
181
184
/*
182
185
* To take incremental backup get the filelist of the last completed database
183
186
*/
@@ -299,7 +302,7 @@ do_backup_database(parray *backup_list)
299
302
pthread_create (& backup_threads [i ], NULL , (void * (* )(void * )) backup_files , backup_threads_args [i ]);
300
303
}
301
304
302
- /* Wait theads */
305
+ /* Wait threads */
303
306
for (i = 0 ; i < num_threads ; i ++ )
304
307
{
305
308
pthread_join (backup_threads [i ], NULL );
@@ -322,9 +325,6 @@ do_backup_database(parray *backup_list)
322
325
parray * xlog_files_list ;
323
326
char pg_xlog_path [MAXPGPATH ];
324
327
325
- /* Wait for the completion of stream */
326
- pthread_join (stream_thread , NULL );
327
-
328
328
/* Scan backup PG_XLOG_DIR */
329
329
xlog_files_list = parray_new ();
330
330
join_path_components (pg_xlog_path , database_path , PG_XLOG_DIR );
@@ -796,7 +796,8 @@ wait_wal_lsn(XLogRecPtr lsn)
796
796
char wal_dir [MAXPGPATH ],
797
797
wal_segment_full_path [MAXPGPATH ];
798
798
char wal_segment [MAXFNAMELEN ];
799
- uint32 try_count = 0 ;
799
+ uint32 try_count = 0 ,
800
+ timeout ;
800
801
801
802
tli = get_current_timeline (false);
802
803
@@ -806,12 +807,35 @@ wait_wal_lsn(XLogRecPtr lsn)
806
807
807
808
if (stream_wal )
808
809
{
810
+ PGresult * res ;
811
+ const char * val ;
812
+ const char * hintmsg ;
813
+
809
814
pgBackupGetPath2 (& current , wal_dir , lengthof (wal_dir ),
810
815
DATABASE_DIR , PG_XLOG_DIR );
811
816
join_path_components (wal_segment_full_path , wal_dir , wal_segment );
817
+
818
+ res = pgut_execute (backup_conn , "show checkpoint_timeout" , 0 , NULL );
819
+ val = PQgetvalue (res , 0 , 0 );
820
+ PQclear (res );
821
+
822
+ if (!parse_int (val , (int * ) & timeout , OPTION_UNIT_S ,
823
+ & hintmsg ))
824
+ {
825
+ if (hintmsg )
826
+ elog (ERROR , "Invalid value of checkout_timeout %s: %s" , val ,
827
+ hintmsg );
828
+ else
829
+ elog (ERROR , "Invalid value of checkout_timeout %s" , val );
830
+ }
831
+ /* Add 3 seconds to the initial value of checkpoint_timeout */
832
+ timeout = timeout + 3 ;
812
833
}
813
834
else
835
+ {
814
836
join_path_components (wal_segment_full_path , arclog_path , wal_segment );
837
+ timeout = archive_timeout ;
838
+ }
815
839
816
840
/* Wait until switched WAL is archived */
817
841
while (!fileExists (wal_segment_full_path ))
@@ -826,10 +850,10 @@ wait_wal_lsn(XLogRecPtr lsn)
826
850
elog (INFO , "wait for LSN %X/%X in archived WAL segment %s" ,
827
851
(uint32 ) (lsn >> 32 ), (uint32 ) lsn , wal_segment_full_path );
828
852
829
- if (archive_timeout > 0 && try_count > archive_timeout )
853
+ if (timeout > 0 && try_count > timeout )
830
854
elog (ERROR ,
831
855
"switched WAL segment %s could not be archived in %d seconds" ,
832
- wal_segment , archive_timeout );
856
+ wal_segment , timeout );
833
857
}
834
858
835
859
/*
@@ -959,6 +983,9 @@ pg_stop_backup(pgBackup *backup)
959
983
960
984
PQclear (res );
961
985
986
+ if (stream_wal )
987
+ /* Wait for the completion of stream */
988
+ pthread_join (stream_thread , NULL );
962
989
wait_wal_lsn (stop_backup_lsn );
963
990
964
991
/* Fill in fields if that is the correct end of backup. */
@@ -1631,7 +1658,7 @@ StreamLog(void *arg)
1631
1658
conn = GetConnection ();
1632
1659
if (!conn )
1633
1660
{
1634
- pthread_mutex_unlock (& check_stream_mut );
1661
+ pthread_mutex_unlock (& start_stream_mut );
1635
1662
/* Error message already written in GetConnection() */
1636
1663
return ;
1637
1664
}
@@ -1655,7 +1682,7 @@ StreamLog(void *arg)
1655
1682
disconnect_and_exit (1 );
1656
1683
1657
1684
/* Ok we have normal stream connect and main process can work again */
1658
- pthread_mutex_unlock (& check_stream_mut );
1685
+ pthread_mutex_unlock (& start_stream_mut );
1659
1686
1660
1687
/*
1661
1688
* We must use startpos as start_lsn from start_backup
0 commit comments