Skip to content

Commit fed13c2

Browse files
committed
Add multithread beckup.
1 parent 6d51bb1 commit fed13c2

File tree

7 files changed

+138
-82
lines changed

7 files changed

+138
-82
lines changed

Makefile

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ DOCS += doc/pg_arman.html doc/README.html
3333
endif # XMLTO
3434
endif # ASCIIDOC
3535

36-
PG_CPPFLAGS = -I$(libpq_srcdir)
37-
override CPPFLAGS := -DFRONTEND $(CPPFLAGS)
38-
PG_LIBS = $(libpq_pgport)
36+
PG_CPPFLAGS = -I$(libpq_srcdir) ${PTHREAD_CFLAGS}
37+
override CPPFLAGS := -DFRONTEND $(CPPFLAGS)
38+
PG_LIBS = $(libpq_pgport) ${PTHREAD_LIBS} ${PTHREAD_CFLAGS}
3939

4040
REGRESS = init option show delete backup restore
4141

backup.c

Lines changed: 102 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include <unistd.h>
1717
#include <dirent.h>
1818
#include <time.h>
19+
#include <pthread.h>
1920

2021
#include "libpq/pqsignal.h"
2122
#include "pgut/pgut-port.h"
@@ -33,12 +34,22 @@ static bool in_backup = false; /* TODO: more robust logic */
3334
/* list of files contained in backup */
3435
parray *backup_files_list;
3536

37+
typedef struct
38+
{
39+
const char *from_root;
40+
const char *to_root;
41+
parray *files;
42+
parray *prev_files;
43+
const XLogRecPtr *lsn;
44+
unsigned int start_file_idx;
45+
unsigned int end_file_idx;
46+
} backup_files_args;
47+
3648
/*
3749
* Backup routines
3850
*/
3951
static void backup_cleanup(bool fatal, void *userdata);
40-
static void backup_files(const char *from_root, const char *to_root,
41-
parray *files, parray *prev_files, const XLogRecPtr *lsn, const char *prefix);
52+
static void backup_files(void *arg);
4253
static parray *do_backup_database(parray *backup_list, pgBackupOption bkupopt);
4354
static void confirm_block_size(const char *name, int blcksz);
4455
static void pg_start_backup(const char *label, bool smooth, pgBackup *backup);
@@ -72,6 +83,8 @@ do_backup_database(parray *backup_list, pgBackupOption bkupopt)
7283
char prev_file_txt[MAXPGPATH]; /* path of the previous backup
7384
* list file */
7485
bool has_backup_label = true; /* flag if backup_label is there */
86+
pthread_t backup_threads[num_threads];
87+
backup_files_args *backup_threads_args[num_threads];
7588

7689
/* repack the options */
7790
bool smooth_checkpoint = bkupopt.smooth_checkpoint;
@@ -221,7 +234,77 @@ do_backup_database(parray *backup_list, pgBackupOption bkupopt)
221234
make_pagemap_from_ptrack(backup_files_list);
222235
}
223236

224-
backup_files(pgdata, path, backup_files_list, prev_files, lsn, NULL);
237+
/* sort pathname ascending */
238+
parray_qsort(backup_files_list, pgFileComparePath);
239+
240+
/* make dirs before backup */
241+
for (i = 0; i < parray_num(backup_files_list); i++)
242+
{
243+
int ret;
244+
struct stat buf;
245+
pgFile *file = (pgFile *) parray_get(backup_files_list, i);
246+
247+
ret = stat(file->path, &buf);
248+
if (ret == -1)
249+
{
250+
if (errno == ENOENT)
251+
{
252+
/* record as skipped file in file_xxx.txt */
253+
file->write_size = BYTES_INVALID;
254+
elog(LOG, "skip");
255+
continue;
256+
}
257+
else
258+
{
259+
elog(ERROR,
260+
"can't stat backup mode. \"%s\": %s",
261+
file->path, strerror(errno));
262+
}
263+
}
264+
/* if the entry was a directory, create it in the backup */
265+
if (S_ISDIR(buf.st_mode))
266+
{
267+
char dirpath[MAXPGPATH];
268+
if (verbose)
269+
elog(LOG, "Make dir %s", file->path + strlen(pgdata) + 1);
270+
join_path_components(dirpath, path, JoinPathEnd(file->path, pgdata));
271+
if (!check)
272+
dir_create_dir(dirpath, DIR_PERMISSION);
273+
}
274+
}
275+
276+
if (num_threads < 1)
277+
num_threads = 1;
278+
279+
for (i = 0; i < num_threads; i++)
280+
{
281+
backup_files_args *arg = pg_malloc(sizeof(backup_files_args));
282+
arg->from_root = pgdata;
283+
arg->to_root = path;
284+
arg->files = backup_files_list;
285+
arg->prev_files = prev_files;
286+
arg->lsn = lsn;
287+
arg->start_file_idx = i * (parray_num(backup_files_list)/num_threads);
288+
if (i == num_threads - 1)
289+
arg->end_file_idx = parray_num(backup_files_list);
290+
else
291+
arg->end_file_idx = (i + 1) * (parray_num(backup_files_list)/num_threads);
292+
293+
if (verbose)
294+
elog(WARNING, "Start thread for start_file_idx:%i end_file_idx:%i num:%li",
295+
arg->start_file_idx,
296+
arg->end_file_idx,
297+
parray_num(backup_files_list));
298+
backup_threads_args[i] = arg;
299+
pthread_create(&backup_threads[i], NULL, (void *(*)(void *)) backup_files, arg);
300+
}
301+
302+
/* Wait theads */
303+
for (i = 0; i < num_threads; i++)
304+
{
305+
pthread_join(backup_threads[i], NULL);
306+
pg_free(backup_threads_args[i]);
307+
}
225308

226309
/* Clear ptrack files after backup */
227310
if (current.backup_mode == BACKUP_MODE_DIFF_PTRACK)
@@ -266,8 +349,8 @@ do_backup(pgBackupOption bkupopt)
266349
int ret;
267350

268351
/* repack the necessary options */
269-
int keep_data_generations = bkupopt.keep_data_generations;
270-
int keep_data_days = bkupopt.keep_data_days;
352+
int keep_data_generations = bkupopt.keep_data_generations;
353+
int keep_data_days = bkupopt.keep_data_days;
271354

272355
/* PGDATA and BACKUP_MODE are always required */
273356
if (pgdata == NULL)
@@ -656,28 +739,22 @@ backup_cleanup(bool fatal, void *userdata)
656739
* Take differential backup at page level.
657740
*/
658741
static void
659-
backup_files(const char *from_root,
660-
const char *to_root,
661-
parray *files,
662-
parray *prev_files,
663-
const XLogRecPtr *lsn,
664-
const char *prefix)
742+
backup_files(void *arg)
665743
{
666744
int i;
667745
struct timeval tv;
668746

669-
/* sort pathname ascending */
670-
parray_qsort(files, pgFileComparePath);
747+
backup_files_args *arguments = (backup_files_args *) arg;
671748

672749
gettimeofday(&tv, NULL);
673750

674751
/* backup a file or create a directory */
675-
for (i = 0; i < parray_num(files); i++)
752+
for (i = arguments->start_file_idx; i < arguments->end_file_idx; i++)
676753
{
677754
int ret;
678755
struct stat buf;
679756

680-
pgFile *file = (pgFile *) parray_get(files, i);
757+
pgFile *file = (pgFile *) parray_get(arguments->files, i);
681758

682759
/* If current time is rewinded, abort this backup. */
683760
if (tv.tv_sec < file->mtime)
@@ -690,18 +767,8 @@ backup_files(const char *from_root,
690767

691768
/* print progress in verbose mode */
692769
if (verbose)
693-
{
694-
if (prefix)
695-
{
696-
char path[MAXPGPATH];
697-
join_path_components(path, prefix, file->path + strlen(from_root) + 1);
698-
elog(LOG, "(%d/%lu) %s", i + 1,
699-
(unsigned long) parray_num(files), path);
700-
}
701-
else
702-
elog(LOG, "(%d/%lu) %s", i + 1, (unsigned long) parray_num(files),
703-
file->path + strlen(from_root) + 1);
704-
}
770+
elog(LOG, "(%d/%lu) %s", i + 1, (unsigned long) parray_num(arguments->files),
771+
file->path + strlen(arguments->from_root) + 1);
705772

706773
/* stat file to get file type, size and modify timestamp */
707774
ret = stat(file->path, &buf);
@@ -722,52 +789,20 @@ backup_files(const char *from_root,
722789
}
723790
}
724791

725-
/* if the entry was a directory, create it in the backup */
792+
/* skip dir because make before */
726793
if (S_ISDIR(buf.st_mode))
727794
{
728-
char dirpath[MAXPGPATH];
729-
730-
join_path_components(dirpath, to_root, JoinPathEnd(file->path, from_root));
731-
if (!check)
732-
dir_create_dir(dirpath, DIR_PERMISSION);
733-
elog(LOG, "directory");
795+
continue;
734796
}
735797
else if (S_ISREG(buf.st_mode))
736798
{
737799
/* skip files which have not been modified since last backup */
738-
if (prev_files)
800+
if (arguments->prev_files)
739801
{
740802
pgFile *prev_file = NULL;
741-
742-
/*
743-
* If prefix is not NULL, the table space is backup from the snapshot.
744-
* Therefore, adjust file name to correspond to the file list.
745-
*/
746-
if (prefix)
747-
{
748-
int j;
749-
750-
for (j = 0; j < parray_num(prev_files); j++)
751-
{
752-
pgFile *p = (pgFile *) parray_get(prev_files, j);
753-
char *prev_path;
754-
char curr_path[MAXPGPATH];
755-
756-
prev_path = p->path + strlen(from_root) + 1;
757-
join_path_components(curr_path, prefix, file->path + strlen(from_root) + 1);
758-
if (strcmp(curr_path, prev_path) == 0)
759-
{
760-
prev_file = p;
761-
break;
762-
}
763-
}
764-
}
765-
else
766-
{
767-
pgFile **p = (pgFile **) parray_bsearch(prev_files, file, pgFileComparePath);
768-
if (p)
769-
prev_file = *p;
770-
}
803+
pgFile **p = (pgFile **) parray_bsearch(arguments->prev_files, file, pgFileComparePath);
804+
if (p)
805+
prev_file = *p;
771806

772807
if (prev_file && prev_file->mtime == file->mtime)
773808
{
@@ -797,8 +832,8 @@ backup_files(const char *from_root,
797832

798833
/* copy the file into backup */
799834
if (!(file->is_datafile
800-
? backup_data_file(from_root, to_root, file, lsn)
801-
: copy_file(from_root, to_root, file)))
835+
? backup_data_file(arguments->from_root, arguments->to_root, file, arguments->lsn)
836+
: copy_file(arguments->from_root, arguments->to_root, file)))
802837
{
803838
/* record as skipped file in file_xxx.txt */
804839
file->write_size = BYTES_INVALID;

expected/backup.out

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,9 @@ page-level backup without validated full backup
3838
0
3939
2
4040
6
41+
###### BACKUP COMMAND TEST-0007 ######
42+
###### ptrack multi thread backup mode ######
43+
0
44+
0
45+
2
46+
6

pg_arman.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ pgBackup current;
3333
static bool smooth_checkpoint;
3434
static int keep_data_generations = KEEP_INFINITE;
3535
static int keep_data_days = KEEP_INFINITE;
36+
int num_threads = 1;
3637
static bool backup_validate = false;
3738

3839
/* restore configuration */
@@ -55,6 +56,7 @@ static pgut_option options[] =
5556
{ 's', 'B', "backup-path", &backup_path, SOURCE_ENV },
5657
/* common options */
5758
{ 'b', 'c', "check", &check },
59+
{ 'i', 'j', "check", &num_threads },
5860
/* backup options */
5961
{ 'f', 'b', "backup-mode", opt_backup_mode, SOURCE_ENV },
6062
{ 'b', 'C', "smooth-checkpoint", &smooth_checkpoint, SOURCE_ENV },

pg_arman.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,8 @@ extern const char *pgdata_exclude[];
202202
/* backup file list from non-snapshot */
203203
extern parray *backup_files_list;
204204

205+
extern int num_threads;
206+
205207
/* in backup.c */
206208
extern int do_backup(pgBackupOption bkupopt);
207209
extern BackupMode parse_backup_mode(const char *value);

sql/backup.sh

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,17 @@ pg_arman show -B ${BACKUP_PATH} > ${TEST_BASE}/TEST-0006.log 2>&1
8585
grep -c OK ${TEST_BASE}/TEST-0006.log
8686
grep OK ${TEST_BASE}/TEST-0006.log | sed -e 's@[^-]@@g' | wc -c | sed 's/^ *//'
8787

88+
echo '###### BACKUP COMMAND TEST-0007 ######'
89+
echo '###### ptrack multi thread backup mode ######'
90+
init_catalog
91+
pg_arman backup -B ${BACKUP_PATH} -b full -j 4 -p ${TEST_PGPORT} -d postgres --verbose > ${TEST_BASE}/TEST-0007-run.log 2>&1;echo $?
92+
pg_arman validate -B ${BACKUP_PATH} --verbose >> ${TEST_BASE}/TEST-0007-run.log 2>&1
93+
pg_arman backup -B ${BACKUP_PATH} -b ptrack -j 4 -p ${TEST_PGPORT} -d postgres --verbose > ${TEST_BASE}/TEST-0007-run.log 2>&1;echo $?
94+
pg_arman validate -B ${BACKUP_PATH} >> ${TEST_BASE}/TEST-0007-run.log 2>&1
95+
pg_arman show -B ${BACKUP_PATH} > ${TEST_BASE}/TEST-0007.log 2>&1
96+
grep -c OK ${TEST_BASE}/TEST-0007.log
97+
grep OK ${TEST_BASE}/TEST-0007.log | sed -e 's@[^-]@@g' | wc -c | sed 's/^ *//'
98+
8899
# cleanup
89100
## clean up the temporal test data
90101
pg_ctl stop -m immediate -D ${PGDATA_PATH} > /dev/null 2>&1

0 commit comments

Comments
 (0)