Skip to content

Commit 310882b

Browse files
committed
Implement backup data compression. Add options --compress-algorithm, --compress-level
1 parent 94d4986 commit 310882b

File tree

6 files changed

+238
-18
lines changed

6 files changed

+238
-18
lines changed

configure.c

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include "pg_probackup.h"
1111

1212
static void opt_log_level(pgut_option *opt, const char *arg);
13+
static void opt_compress_alg(pgut_option *opt, const char *arg);
1314

1415
static pgBackupConfig *cur_config = NULL;
1516

@@ -47,6 +48,11 @@ do_configure(bool show_only)
4748
if (retention_window)
4849
config->retention_window = retention_window;
4950

51+
if (compress_alg != NOT_DEFINED_COMPRESS)
52+
config->compress_alg = compress_alg;
53+
if (compress_level != -1)
54+
config->compress_level = compress_level;
55+
5056
if (show_only)
5157
writeBackupCatalogConfig(stderr, config);
5258
else
@@ -74,6 +80,9 @@ pgBackupConfigInit(pgBackupConfig *config)
7480

7581
config->retention_redundancy = 0;
7682
config->retention_window = 0;
83+
84+
config->compress_alg = NOT_DEFINED_COMPRESS;
85+
config->compress_level = -1;
7786
}
7887

7988
void
@@ -113,6 +122,15 @@ writeBackupCatalogConfig(FILE *out, pgBackupConfig *config)
113122
if (config->retention_window)
114123
fprintf(out, "retention-window = %u\n", config->retention_window);
115124

125+
fprintf(out, "#Compression parameters:\n");
126+
127+
fprintf(out, "compress-algorithm = %s\n", deparse_compress_alg(config->compress_alg));
128+
129+
/* if none value is set, print default */
130+
if (config->compress_level == -1)
131+
fprintf(out, "compress-level = %u\n", DEFAULT_COMPRESS_LEVEL);
132+
else
133+
fprintf(out, "compress-level = %u\n", config->compress_level);
116134
}
117135

118136
void
@@ -143,6 +161,9 @@ readBackupCatalogConfigFile(void)
143161
/* retention options */
144162
{ 'u', 0, "retention-redundancy", &(config->retention_redundancy),SOURCE_FILE_STRICT },
145163
{ 'u', 0, "retention-window", &(config->retention_window), SOURCE_FILE_STRICT },
164+
/* compression options */
165+
{ 'f', 36, "compress-algorithm", opt_compress_alg, SOURCE_CMDLINE },
166+
{ 'u', 37, "compress-level", &(config->compress_level), SOURCE_CMDLINE },
146167
/* logging options */
147168
{ 'f', 40, "log-level", opt_log_level, SOURCE_CMDLINE },
148169
{ 's', 41, "log-filename", &(config->log_filename), SOURCE_CMDLINE },
@@ -177,3 +198,9 @@ opt_log_level(pgut_option *opt, const char *arg)
177198
{
178199
cur_config->log_level = parse_log_level(arg);
179200
}
201+
202+
static void
203+
opt_compress_alg(pgut_option *opt, const char *arg)
204+
{
205+
cur_config->compress_alg = parse_compress_alg(arg);
206+
}

data.c

Lines changed: 107 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,63 @@
1919
#include "storage/block.h"
2020
#include "storage/bufpage.h"
2121
#include "storage/checksum_impl.h"
22+
#include <common/pg_lzcompress.h>
23+
#include <zlib.h>
24+
25+
static size_t zlib_compress(void* dst, size_t dst_size, void const* src, size_t src_size)
26+
{
27+
uLongf compressed_size = dst_size;
28+
int rc = compress2(dst, &compressed_size, src, src_size, compress_level);
29+
return rc == Z_OK ? compressed_size : rc;
30+
}
31+
32+
static size_t zlib_decompress(void* dst, size_t dst_size, void const* src, size_t src_size)
33+
{
34+
uLongf dest_len = dst_size;
35+
int rc = uncompress(dst, &dest_len, src, src_size);
36+
return rc == Z_OK ? dest_len : rc;
37+
}
38+
39+
static size_t
40+
do_compress(void* dst, size_t dst_size, void const* src, size_t src_size, CompressAlg alg)
41+
{
42+
switch (alg)
43+
{
44+
case NONE_COMPRESS:
45+
case NOT_DEFINED_COMPRESS:
46+
return -1;
47+
case ZLIB_COMPRESS:
48+
return zlib_compress(dst, dst_size, src, src_size);
49+
case PGLZ_COMPRESS:
50+
return pglz_compress(src, src_size, dst, PGLZ_strategy_always);
51+
}
52+
53+
return -1;
54+
}
55+
56+
static size_t
57+
do_decompress(void* dst, size_t dst_size, void const* src, size_t src_size, CompressAlg alg)
58+
{
59+
switch (alg)
60+
{
61+
case NONE_COMPRESS:
62+
case NOT_DEFINED_COMPRESS:
63+
return -1;
64+
case ZLIB_COMPRESS:
65+
return zlib_decompress(dst, dst_size, src, src_size);
66+
case PGLZ_COMPRESS:
67+
return pglz_decompress(src, src_size, dst, dst_size);
68+
}
69+
70+
return -1;
71+
}
72+
73+
2274

2375
typedef struct BackupPageHeader
2476
{
2577
BlockNumber block; /* block number */
78+
int32 compressed_size;
2679
} BackupPageHeader;
2780

2881
/* Verify page's header */
@@ -61,8 +114,10 @@ backup_data_page(pgFile *file, XLogRecPtr prev_backup_start_lsn,
61114
BackupPageHeader header;
62115
off_t offset;
63116
DataPage page; /* used as read buffer */
64-
size_t write_buffer_size = sizeof(header) + BLCKSZ;
65-
char write_buffer[write_buffer_size];
117+
DataPage compressed_page; /* used as read buffer */
118+
size_t write_buffer_size;
119+
/* maximum size of write buffer */
120+
char write_buffer[BLCKSZ+sizeof(header)];
66121
size_t read_len = 0;
67122
XLogRecPtr page_lsn;
68123
int try_checksum = 100;
@@ -162,9 +217,31 @@ backup_data_page(pgFile *file, XLogRecPtr prev_backup_start_lsn,
162217

163218
file->read_size += read_len;
164219

165-
memcpy(write_buffer, &header, sizeof(header));
166-
/* TODO implement block compression here? */
167-
memcpy(write_buffer + sizeof(header), page.data, BLCKSZ);
220+
header.compressed_size = do_compress(compressed_page.data, sizeof(compressed_page.data),
221+
page.data, sizeof(page.data), compress_alg);
222+
223+
file->compress_alg = compress_alg;
224+
225+
Assert (header.compressed_size <= BLCKSZ);
226+
write_buffer_size = sizeof(header);
227+
228+
if (header.compressed_size > 0)
229+
{
230+
memcpy(write_buffer, &header, sizeof(header));
231+
memcpy(write_buffer + sizeof(header), compressed_page.data, header.compressed_size);
232+
write_buffer_size += MAXALIGN(header.compressed_size);
233+
}
234+
else
235+
{
236+
header.compressed_size = BLCKSZ;
237+
memcpy(write_buffer, &header, sizeof(header));
238+
memcpy(write_buffer + sizeof(header), page.data, BLCKSZ);
239+
write_buffer_size += header.compressed_size;
240+
}
241+
242+
/* Update CRC */
243+
COMP_CRC32C(*crc, &write_buffer, write_buffer_size);
244+
168245
/* write data page */
169246
if(fwrite(write_buffer, 1, write_buffer_size, out) != write_buffer_size)
170247
{
@@ -175,10 +252,6 @@ backup_data_page(pgFile *file, XLogRecPtr prev_backup_start_lsn,
175252
file->path, blknum, strerror(errno_tmp));
176253
}
177254

178-
/* update CRC */
179-
COMP_CRC32C(*crc, &header, sizeof(header));
180-
COMP_CRC32C(*crc, page.data, BLCKSZ);
181-
182255
file->write_size += write_buffer_size;
183256
}
184257

@@ -468,7 +541,8 @@ restore_data_file(const char *from_root,
468541
for (blknum = 0; ; blknum++)
469542
{
470543
size_t read_len;
471-
DataPage page; /* used as read buffer */
544+
DataPage compressed_page; /* used as read buffer */
545+
DataPage page;
472546

473547
/* read BackupPageHeader */
474548
read_len = fread(&header, 1, sizeof(header), in);
@@ -482,18 +556,35 @@ restore_data_file(const char *from_root,
482556
"odd size page found at block %u of \"%s\"",
483557
blknum, file->path);
484558
else
485-
elog(ERROR, "cannot read block %u of \"%s\": %s",
559+
elog(ERROR, "cannot read header of block %u of \"%s\": %s",
486560
blknum, file->path, strerror(errno_tmp));
487561
}
488562

489563
if (header.block < blknum)
490-
elog(ERROR, "backup is broken at block %u",
491-
blknum);
564+
elog(ERROR, "backup is broken at block %u", blknum);
492565

566+
/* TODO fix this assert */
567+
Assert (header.compressed_size <= BLCKSZ);
493568

494-
if (fread(page.data, 1, BLCKSZ, in) != BLCKSZ)
495-
elog(ERROR, "cannot read block %u of \"%s\": %s",
496-
blknum, file->path, strerror(errno));
569+
read_len = fread(compressed_page.data, 1,
570+
MAXALIGN(header.compressed_size), in);
571+
if (read_len != MAXALIGN(header.compressed_size))
572+
elog(ERROR, "cannot read block %u of \"%s\" read %lu of %d",
573+
blknum, file->path, read_len, header.compressed_size);
574+
575+
if (header.compressed_size < BLCKSZ)
576+
{
577+
size_t uncompressed_size = 0;
578+
579+
uncompressed_size = do_decompress(page.data, BLCKSZ,
580+
compressed_page.data,
581+
header.compressed_size, file->compress_alg);
582+
583+
if (uncompressed_size != BLCKSZ)
584+
elog(ERROR, "page uncompressed to %ld bytes. != BLCKSZ", uncompressed_size);
585+
}
586+
else
587+
memcpy(page.data, compressed_page.data, BLCKSZ);
497588

498589
/* update checksum because we are not save whole */
499590
if(backup->checksum_version)

dir.c

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ pgFileInit(const char *path)
156156
strcpy(file->path, path); /* enough buffer size guaranteed */
157157
file->generation = -1;
158158
file->is_partial_copy = 0;
159+
file->compress_alg = NOT_DEFINED_COMPRESS;
159160
return file;
160161
}
161162

@@ -671,9 +672,9 @@ print_file_list(FILE *out, const parray *files, const char *root)
671672
path = GetRelativePath(path, root);
672673

673674
fprintf(out, "{\"path\":\"%s\", \"size\":\"%lu\",\"mode\":\"%u\","
674-
"\"is_datafile\":\"%u\", \"crc\":\"%u\"",
675+
"\"is_datafile\":\"%u\", \"crc\":\"%u\", \"compress_alg\":\"%s\"",
675676
path, (unsigned long) file->write_size, file->mode,
676-
file->is_datafile?1:0, file->crc);
677+
file->is_datafile?1:0, file->crc, deparse_compress_alg(file->compress_alg));
677678

678679
if (file->is_datafile)
679680
fprintf(out, ",\"segno\":\"%d\"", file->segno);
@@ -847,6 +848,7 @@ dir_read_file_list(const char *root, const char *file_txt)
847848
char path[MAXPGPATH];
848849
char filepath[MAXPGPATH];
849850
char linked[MAXPGPATH];
851+
char compress_alg_string[MAXPGPATH];
850852
uint64 write_size,
851853
mode, /* bit length of mode_t depends on platforms */
852854
is_datafile,
@@ -867,6 +869,7 @@ dir_read_file_list(const char *root, const char *file_txt)
867869
/* optional fields */
868870
get_control_value(buf, "linked", linked, NULL, false);
869871
get_control_value(buf, "segno", NULL, &segno, false);
872+
get_control_value(buf, "compress_alg", compress_alg_string, NULL, false);
870873

871874
#ifdef PGPRO_EE
872875
get_control_value(buf, "CFS_generation", NULL, &generation, true);
@@ -883,6 +886,7 @@ dir_read_file_list(const char *root, const char *file_txt)
883886
file->mode = (mode_t) mode;
884887
file->is_datafile = is_datafile ? true : false;
885888
file->crc = (pg_crc32) crc;
889+
file->compress_alg = parse_compress_alg(compress_alg_string);
886890
if (linked[0])
887891
file->linked = pgut_strdup(linked);
888892
file->segno = (int) segno;

help.c

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,16 @@ help_pg_probackup(void)
7575
printf(_(" [--log-rotation-age=log-rotation-age]\n"));
7676
printf(_(" [--retention-redundancy=retention-redundancy]\n"));
7777
printf(_(" [--retention-window=retention-window]\n"));
78+
printf(_(" [--compress-algorithm=compress-algorithm]\n"));
79+
printf(_(" [--compress-level=compress-level]\n"));
7880

7981
printf(_("\n %s show-config -B backup-dir --instance=instance_name\n"), PROGRAM_NAME);
8082

8183
printf(_("\n %s backup -B backup-path -b backup-mode --instance=instance_name\n"), PROGRAM_NAME);
8284
printf(_(" [-D pgdata-dir] [-C] [--stream [-S slot-name]] [--backup-pg-log]\n"));
8385
printf(_(" [-j num-threads] [--archive-timeout=archive-timeout]\n"));
86+
printf(_(" [--compress-algorithm=compress-algorithm]\n"));
87+
printf(_(" [--compress-level=compress-level]\n"));
8488
printf(_(" [--progress] [--delete-expired]\n"));
8589
printf(_(" [-d dbname] [-h host] [-p port] [-U username]\n"));
8690

@@ -132,6 +136,8 @@ help_backup(void)
132136
printf(_(" [-D pgdata-dir] [-C] [--stream [-S slot-name]] [--backup-pg-log]\n"));
133137
printf(_(" [-j num-threads] [--archive-timeout=archive-timeout]\n"));
134138
printf(_(" [--progress] [--delete-expired]\n"));
139+
printf(_(" [--compress-algorithm=compress-algorithm]\n"));
140+
printf(_(" [--compress-level=compress-level]\n"));
135141
printf(_(" [-d dbname] [-h host] [-p port] [-U username]\n\n"));
136142

137143
printf(_(" -B, --backup-path=backup-path location of the backup storage area\n"));
@@ -148,6 +154,11 @@ help_backup(void)
148154
printf(_(" --delete-expired delete backups expired according to current\n"));
149155
printf(_(" retention policy after successful backup completion\n"));
150156

157+
printf(_("\n Compression options:\n"));
158+
printf(_(" --compress-algorithm=compress-algorithm\n"));
159+
printf(_(" available options: 'zlib','pglz','none'\n"));
160+
printf(_(" --compress-level=compress-level level of compression [0-9]\n"));
161+
151162
printf(_("\n Connection options:\n"));
152163
printf(_(" -d, --dbname=DBNAME database to connect\n"));
153164
printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
@@ -236,6 +247,8 @@ help_set_config(void)
236247
printf(_(" [--log-rotation-age=log-rotation-age]\n"));
237248
printf(_(" [--retention-redundancy=retention-redundancy]\n"));
238249
printf(_(" [--retention-window=retention-window]\n\n"));
250+
printf(_(" [--compress-algorithm=compress-algorithm]\n"));
251+
printf(_(" [--compress-level=compress-level]\n"));
239252

240253
printf(_(" -B, --backup-path=backup-path location of the backup storage area\n"));
241254
printf(_(" --instance=instance_name name of the instance\n"));
@@ -264,6 +277,11 @@ help_set_config(void)
264277
printf(_(" --retention-window=retention-window\n"));
265278
printf(_(" number of days of recoverability\n"));
266279

280+
printf(_("\n Compression options:\n"));
281+
printf(_(" --compress-algorithm=compress-algorithm\n"));
282+
printf(_(" available options: 'zlib','pglz','none'\n"));
283+
printf(_(" --compress-level=compress-level\n"));
284+
printf(_(" level of compression [0-9]\n"));
267285
}
268286

269287
static void

0 commit comments

Comments
 (0)