16
16
#include <unistd.h>
17
17
#include <dirent.h>
18
18
#include <time.h>
19
+ #include <pthread.h>
19
20
20
21
#include "libpq/pqsignal.h"
21
22
#include "pgut/pgut-port.h"
@@ -33,12 +34,22 @@ static bool in_backup = false; /* TODO: more robust logic */
33
34
/* list of files contained in backup */
34
35
parray * backup_files_list ;
35
36
37
+ typedef struct
38
+ {
39
+ const char * from_root ;
40
+ const char * to_root ;
41
+ parray * files ;
42
+ parray * prev_files ;
43
+ const XLogRecPtr * lsn ;
44
+ unsigned int start_file_idx ;
45
+ unsigned int end_file_idx ;
46
+ } backup_files_args ;
47
+
36
48
/*
37
49
* Backup routines
38
50
*/
39
51
static void backup_cleanup (bool fatal , void * userdata );
40
- static void backup_files (const char * from_root , const char * to_root ,
41
- parray * files , parray * prev_files , const XLogRecPtr * lsn , const char * prefix );
52
+ static void backup_files (void * arg );
42
53
static parray * do_backup_database (parray * backup_list , pgBackupOption bkupopt );
43
54
static void confirm_block_size (const char * name , int blcksz );
44
55
static void pg_start_backup (const char * label , bool smooth , pgBackup * backup );
@@ -72,6 +83,8 @@ do_backup_database(parray *backup_list, pgBackupOption bkupopt)
72
83
char prev_file_txt [MAXPGPATH ]; /* path of the previous backup
73
84
* list file */
74
85
bool has_backup_label = true; /* flag if backup_label is there */
86
+ pthread_t backup_threads [num_threads ];
87
+ backup_files_args * backup_threads_args [num_threads ];
75
88
76
89
/* repack the options */
77
90
bool smooth_checkpoint = bkupopt .smooth_checkpoint ;
@@ -221,7 +234,77 @@ do_backup_database(parray *backup_list, pgBackupOption bkupopt)
221
234
make_pagemap_from_ptrack (backup_files_list );
222
235
}
223
236
224
- backup_files (pgdata , path , backup_files_list , prev_files , lsn , NULL );
237
+ /* sort pathname ascending */
238
+ parray_qsort (backup_files_list , pgFileComparePath );
239
+
240
+ /* make dirs before backup */
241
+ for (i = 0 ; i < parray_num (backup_files_list ); i ++ )
242
+ {
243
+ int ret ;
244
+ struct stat buf ;
245
+ pgFile * file = (pgFile * ) parray_get (backup_files_list , i );
246
+
247
+ ret = stat (file -> path , & buf );
248
+ if (ret == -1 )
249
+ {
250
+ if (errno == ENOENT )
251
+ {
252
+ /* record as skipped file in file_xxx.txt */
253
+ file -> write_size = BYTES_INVALID ;
254
+ elog (LOG , "skip" );
255
+ continue ;
256
+ }
257
+ else
258
+ {
259
+ elog (ERROR ,
260
+ "can't stat backup mode. \"%s\": %s" ,
261
+ file -> path , strerror (errno ));
262
+ }
263
+ }
264
+ /* if the entry was a directory, create it in the backup */
265
+ if (S_ISDIR (buf .st_mode ))
266
+ {
267
+ char dirpath [MAXPGPATH ];
268
+ if (verbose )
269
+ elog (LOG , "Make dir %s" , file -> path + strlen (pgdata ) + 1 );
270
+ join_path_components (dirpath , path , JoinPathEnd (file -> path , pgdata ));
271
+ if (!check )
272
+ dir_create_dir (dirpath , DIR_PERMISSION );
273
+ }
274
+ }
275
+
276
+ if (num_threads < 1 )
277
+ num_threads = 1 ;
278
+
279
+ for (i = 0 ; i < num_threads ; i ++ )
280
+ {
281
+ backup_files_args * arg = pg_malloc (sizeof (backup_files_args ));
282
+ arg -> from_root = pgdata ;
283
+ arg -> to_root = path ;
284
+ arg -> files = backup_files_list ;
285
+ arg -> prev_files = prev_files ;
286
+ arg -> lsn = lsn ;
287
+ arg -> start_file_idx = i * (parray_num (backup_files_list )/num_threads );
288
+ if (i == num_threads - 1 )
289
+ arg -> end_file_idx = parray_num (backup_files_list );
290
+ else
291
+ arg -> end_file_idx = (i + 1 ) * (parray_num (backup_files_list )/num_threads );
292
+
293
+ if (verbose )
294
+ elog (WARNING , "Start thread for start_file_idx:%i end_file_idx:%i num:%li" ,
295
+ arg -> start_file_idx ,
296
+ arg -> end_file_idx ,
297
+ parray_num (backup_files_list ));
298
+ backup_threads_args [i ] = arg ;
299
+ pthread_create (& backup_threads [i ], NULL , (void * (* )(void * )) backup_files , arg );
300
+ }
301
+
302
+ /* Wait theads */
303
+ for (i = 0 ; i < num_threads ; i ++ )
304
+ {
305
+ pthread_join (backup_threads [i ], NULL );
306
+ pg_free (backup_threads_args [i ]);
307
+ }
225
308
226
309
/* Clear ptrack files after backup */
227
310
if (current .backup_mode == BACKUP_MODE_DIFF_PTRACK )
@@ -266,8 +349,8 @@ do_backup(pgBackupOption bkupopt)
266
349
int ret ;
267
350
268
351
/* repack the necessary options */
269
- int keep_data_generations = bkupopt .keep_data_generations ;
270
- int keep_data_days = bkupopt .keep_data_days ;
352
+ int keep_data_generations = bkupopt .keep_data_generations ;
353
+ int keep_data_days = bkupopt .keep_data_days ;
271
354
272
355
/* PGDATA and BACKUP_MODE are always required */
273
356
if (pgdata == NULL )
@@ -656,28 +739,22 @@ backup_cleanup(bool fatal, void *userdata)
656
739
* Take differential backup at page level.
657
740
*/
658
741
static void
659
- backup_files (const char * from_root ,
660
- const char * to_root ,
661
- parray * files ,
662
- parray * prev_files ,
663
- const XLogRecPtr * lsn ,
664
- const char * prefix )
742
+ backup_files (void * arg )
665
743
{
666
744
int i ;
667
745
struct timeval tv ;
668
746
669
- /* sort pathname ascending */
670
- parray_qsort (files , pgFileComparePath );
747
+ backup_files_args * arguments = (backup_files_args * ) arg ;
671
748
672
749
gettimeofday (& tv , NULL );
673
750
674
751
/* backup a file or create a directory */
675
- for (i = 0 ; i < parray_num ( files ) ; i ++ )
752
+ for (i = arguments -> start_file_idx ; i < arguments -> end_file_idx ; i ++ )
676
753
{
677
754
int ret ;
678
755
struct stat buf ;
679
756
680
- pgFile * file = (pgFile * ) parray_get (files , i );
757
+ pgFile * file = (pgFile * ) parray_get (arguments -> files , i );
681
758
682
759
/* If current time is rewinded, abort this backup. */
683
760
if (tv .tv_sec < file -> mtime )
@@ -690,18 +767,8 @@ backup_files(const char *from_root,
690
767
691
768
/* print progress in verbose mode */
692
769
if (verbose )
693
- {
694
- if (prefix )
695
- {
696
- char path [MAXPGPATH ];
697
- join_path_components (path , prefix , file -> path + strlen (from_root ) + 1 );
698
- elog (LOG , "(%d/%lu) %s" , i + 1 ,
699
- (unsigned long ) parray_num (files ), path );
700
- }
701
- else
702
- elog (LOG , "(%d/%lu) %s" , i + 1 , (unsigned long ) parray_num (files ),
703
- file -> path + strlen (from_root ) + 1 );
704
- }
770
+ elog (LOG , "(%d/%lu) %s" , i + 1 , (unsigned long ) parray_num (arguments -> files ),
771
+ file -> path + strlen (arguments -> from_root ) + 1 );
705
772
706
773
/* stat file to get file type, size and modify timestamp */
707
774
ret = stat (file -> path , & buf );
@@ -722,52 +789,20 @@ backup_files(const char *from_root,
722
789
}
723
790
}
724
791
725
- /* if the entry was a directory, create it in the backup */
792
+ /* skip dir because make before */
726
793
if (S_ISDIR (buf .st_mode ))
727
794
{
728
- char dirpath [MAXPGPATH ];
729
-
730
- join_path_components (dirpath , to_root , JoinPathEnd (file -> path , from_root ));
731
- if (!check )
732
- dir_create_dir (dirpath , DIR_PERMISSION );
733
- elog (LOG , "directory" );
795
+ continue ;
734
796
}
735
797
else if (S_ISREG (buf .st_mode ))
736
798
{
737
799
/* skip files which have not been modified since last backup */
738
- if (prev_files )
800
+ if (arguments -> prev_files )
739
801
{
740
802
pgFile * prev_file = NULL ;
741
-
742
- /*
743
- * If prefix is not NULL, the table space is backup from the snapshot.
744
- * Therefore, adjust file name to correspond to the file list.
745
- */
746
- if (prefix )
747
- {
748
- int j ;
749
-
750
- for (j = 0 ; j < parray_num (prev_files ); j ++ )
751
- {
752
- pgFile * p = (pgFile * ) parray_get (prev_files , j );
753
- char * prev_path ;
754
- char curr_path [MAXPGPATH ];
755
-
756
- prev_path = p -> path + strlen (from_root ) + 1 ;
757
- join_path_components (curr_path , prefix , file -> path + strlen (from_root ) + 1 );
758
- if (strcmp (curr_path , prev_path ) == 0 )
759
- {
760
- prev_file = p ;
761
- break ;
762
- }
763
- }
764
- }
765
- else
766
- {
767
- pgFile * * p = (pgFile * * ) parray_bsearch (prev_files , file , pgFileComparePath );
768
- if (p )
769
- prev_file = * p ;
770
- }
803
+ pgFile * * p = (pgFile * * ) parray_bsearch (arguments -> prev_files , file , pgFileComparePath );
804
+ if (p )
805
+ prev_file = * p ;
771
806
772
807
if (prev_file && prev_file -> mtime == file -> mtime )
773
808
{
@@ -797,8 +832,8 @@ backup_files(const char *from_root,
797
832
798
833
/* copy the file into backup */
799
834
if (!(file -> is_datafile
800
- ? backup_data_file (from_root , to_root , file , lsn )
801
- : copy_file (from_root , to_root , file )))
835
+ ? backup_data_file (arguments -> from_root , arguments -> to_root , file , arguments -> lsn )
836
+ : copy_file (arguments -> from_root , arguments -> to_root , file )))
802
837
{
803
838
/* record as skipped file in file_xxx.txt */
804
839
file -> write_size = BYTES_INVALID ;
0 commit comments