@@ -47,8 +47,6 @@ typedef struct
47
47
parray * files ;
48
48
parray * prev_files ;
49
49
const XLogRecPtr * lsn ;
50
- unsigned int start_file_idx ;
51
- unsigned int end_file_idx ;
52
50
} backup_files_args ;
53
51
54
52
/*
@@ -106,6 +104,7 @@ do_backup_database(parray *backup_list, pgBackupOption bkupopt)
106
104
pthread_t stream_thread ;
107
105
backup_files_args * backup_threads_args [num_threads ];
108
106
107
+
109
108
/* repack the options */
110
109
bool smooth_checkpoint = bkupopt .smooth_checkpoint ;
111
110
pgBackup * prev_backup = NULL ;
@@ -315,27 +314,35 @@ do_backup_database(parray *backup_list, pgBackupOption bkupopt)
315
314
if (num_threads < 1 )
316
315
num_threads = 1 ;
317
316
317
+ /* sort by size for load balancing */
318
+ parray_qsort (backup_files_list , pgFileCompareSize );
319
+
320
+ /* init thread args with own file lists */
318
321
for (i = 0 ; i < num_threads ; i ++ )
319
322
{
320
323
backup_files_args * arg = pg_malloc (sizeof (backup_files_args ));
321
324
arg -> from_root = pgdata ;
322
325
arg -> to_root = path ;
323
- arg -> files = backup_files_list ;
326
+ arg -> files = parray_new () ;
324
327
arg -> prev_files = prev_files ;
325
328
arg -> lsn = lsn ;
326
- arg -> start_file_idx = i * (parray_num (backup_files_list )/num_threads );
327
- if (i == num_threads - 1 )
328
- arg -> end_file_idx = parray_num (backup_files_list );
329
- else
330
- arg -> end_file_idx = (i + 1 ) * (parray_num (backup_files_list )/num_threads );
329
+ backup_threads_args [i ] = arg ;
330
+ }
331
+
332
+ /* balance load between threads */
333
+ for (i = 0 ; i < parray_num (backup_files_list ); i ++ )
334
+ {
335
+ int cur_thread = i % num_threads ;
336
+ parray_append (backup_threads_args [cur_thread ]-> files ,
337
+ parray_get (backup_files_list , i ));
338
+ }
331
339
340
+ /* Run threads */
341
+ for (i = 0 ; i < num_threads ; i ++ )
342
+ {
332
343
if (verbose )
333
- elog (WARNING , "Start thread for start_file_idx:%i end_file_idx:%i num:%li" ,
334
- arg -> start_file_idx ,
335
- arg -> end_file_idx ,
336
- parray_num (backup_files_list ));
337
- backup_threads_args [i ] = arg ;
338
- pthread_create (& backup_threads [i ], NULL , (void * (* )(void * )) backup_files , arg );
344
+ elog (WARNING , "Start thread num:%li" , parray_num (backup_threads_args [i ]-> files ));
345
+ pthread_create (& backup_threads [i ], NULL , (void * (* )(void * )) backup_files , backup_threads_args [i ]);
339
346
}
340
347
341
348
/* Wait theads */
@@ -911,7 +918,7 @@ backup_files(void *arg)
911
918
gettimeofday (& tv , NULL );
912
919
913
920
/* backup a file or create a directory */
914
- for (i = arguments -> start_file_idx ; i < arguments -> end_file_idx ; i ++ )
921
+ for (i = 0 ; i < parray_num ( arguments -> files ) ; i ++ )
915
922
{
916
923
int ret ;
917
924
struct stat buf ;
0 commit comments