Skip to content

Commit 51c0d18

Browse files
committed
Allow parallel zstd compression when taking a base backup.
libzstd allows transparent parallel compression just by setting an option when creating the compression context, so permit that for both client and server-side backup compression. To use this, use something like pg_basebackup --compress WHERE-zstd:workers=N where WHERE is "client" or "server" and N is an integer. When compression is performed on the server side, this will spawn threads inside the PostgreSQL backend. While there is almost no PostgreSQL server code which is thread-safe, the threads here are used internally by libzstd and touch only data structures controlled by libzstd. Patch by me, based in part on earlier work by Dipesh Pandit and Jeevan Ladhe. Reviewed by Justin Pryzby. Discussion: http://postgr.es/m/CA+Tgmobj6u-nWF-j=FemygUhobhryLxf9h-wJN7W-2rSsseHNA@mail.gmail.com
1 parent c6863b8 commit 51c0d18

File tree

9 files changed

+147
-39
lines changed

9 files changed

+147
-39
lines changed

doc/src/sgml/protocol.sgml

+9-3
Original file line numberDiff line numberDiff line change
@@ -2739,17 +2739,23 @@ The commands accepted in replication mode are:
27392739
option. If the value is an integer, it specifies the compression
27402740
level. Otherwise, it should be a comma-separated list of items,
27412741
each of the form <literal>keyword</literal> or
2742-
<literal>keyword=value</literal>. Currently, the only supported
2743-
keyword is <literal>level</literal>, which sets the compression
2744-
level.
2742+
<literal>keyword=value</literal>. Currently, the supported keywords
2743+
are <literal>level</literal> and <literal>workers</literal>.
27452744
</para>
27462745

27472746
<para>
2747+
The <literal>level</literal> keyword sets the compression level.
27482748
For <literal>gzip</literal> the compression level should be an
27492749
integer between 1 and 9, for <literal>lz4</literal> an integer
27502750
between 1 and 12, and for <literal>zstd</literal> an integer
27512751
between 1 and 22.
27522752
</para>
2753+
2754+
<para>
2755+
The <literal>workers</literal> keyword sets the number of threads
2756+
that should be used for parallel compression. Parallel compression
2757+
is supported only for <literal>zstd</literal>.
2758+
</para>
27532759
</listitem>
27542760
</varlistentry>
27552761

doc/src/sgml/ref/pg_basebackup.sgml

+2-2
Original file line numberDiff line numberDiff line change
@@ -424,8 +424,8 @@ PostgreSQL documentation
424424
integer, it specifies the compression level. Otherwise, it should be
425425
a comma-separated list of items, each of the form
426426
<literal>keyword</literal> or <literal>keyword=value</literal>.
427-
Currently, the only supported keyword is <literal>level</literal>,
428-
which sets the compression level.
427+
Currently, the supported keywords are <literal>level</literal>
428+
and <literal>workers</literal>.
429429
</para>
430430
<para>
431431
If no compression level is specified, the default compression level

src/backend/replication/basebackup_zstd.c

+28-17
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ typedef struct bbsink_zstd
2525
/* Common information for all types of sink. */
2626
bbsink base;
2727

28-
/* Compression level */
29-
int compresslevel;
28+
/* Compression options */
29+
bc_specification *compress;
3030

3131
ZSTD_CCtx *cctx;
3232
ZSTD_outBuffer zstd_outBuf;
@@ -67,22 +67,13 @@ bbsink_zstd_new(bbsink *next, bc_specification *compress)
6767
return NULL; /* keep compiler quiet */
6868
#else
6969
bbsink_zstd *sink;
70-
int compresslevel;
7170

7271
Assert(next != NULL);
7372

74-
if ((compress->options & BACKUP_COMPRESSION_OPTION_LEVEL) == 0)
75-
compresslevel = 0;
76-
else
77-
{
78-
compresslevel = compress->level;
79-
Assert(compresslevel >= 1 && compresslevel <= 22);
80-
}
81-
8273
sink = palloc0(sizeof(bbsink_zstd));
8374
*((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_zstd_ops;
8475
sink->base.bbs_next = next;
85-
sink->compresslevel = compresslevel;
76+
sink->compress = compress;
8677

8778
return &sink->base;
8879
#endif
@@ -99,16 +90,36 @@ bbsink_zstd_begin_backup(bbsink *sink)
9990
bbsink_zstd *mysink = (bbsink_zstd *) sink;
10091
size_t output_buffer_bound;
10192
size_t ret;
93+
bc_specification *compress = mysink->compress;
10294

10395
mysink->cctx = ZSTD_createCCtx();
10496
if (!mysink->cctx)
10597
elog(ERROR, "could not create zstd compression context");
10698

107-
ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_compressionLevel,
108-
mysink->compresslevel);
109-
if (ZSTD_isError(ret))
110-
elog(ERROR, "could not set zstd compression level to %d: %s",
111-
mysink->compresslevel, ZSTD_getErrorName(ret));
99+
if ((compress->options & BACKUP_COMPRESSION_OPTION_LEVEL) != 0)
100+
{
101+
ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_compressionLevel,
102+
compress->level);
103+
if (ZSTD_isError(ret))
104+
elog(ERROR, "could not set zstd compression level to %d: %s",
105+
compress->level, ZSTD_getErrorName(ret));
106+
}
107+
108+
if ((compress->options & BACKUP_COMPRESSION_OPTION_WORKERS) != 0)
109+
{
110+
/*
111+
* On older versions of libzstd, this option does not exist, and trying
112+
* to set it will fail. Similarly for newer versions if they are
113+
* compiled without threading support.
114+
*/
115+
ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_nbWorkers,
116+
compress->workers);
117+
if (ZSTD_isError(ret))
118+
ereport(ERROR,
119+
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
120+
errmsg("could not set compression worker count to %d: %s",
121+
compress->workers, ZSTD_getErrorName(ret)));
122+
}
112123

113124
/*
114125
* We need our own buffer, because we're going to pass different data to

src/bin/pg_basebackup/bbstreamer_zstd.c

+28-12
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ bbstreamer_zstd_compressor_new(bbstreamer *next, bc_specification *compress)
6767
{
6868
#ifdef USE_ZSTD
6969
bbstreamer_zstd_frame *streamer;
70-
int compresslevel;
7170
size_t ret;
7271

7372
Assert(next != NULL);
@@ -88,18 +87,35 @@ bbstreamer_zstd_compressor_new(bbstreamer *next, bc_specification *compress)
8887
exit(1);
8988
}
9089

91-
/* Initialize stream compression preferences */
92-
if ((compress->options & BACKUP_COMPRESSION_OPTION_LEVEL) == 0)
93-
compresslevel = 0;
94-
else
95-
compresslevel = compress->level;
96-
ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
97-
compresslevel);
98-
if (ZSTD_isError(ret))
90+
/* Set compression level, if specified */
91+
if ((compress->options & BACKUP_COMPRESSION_OPTION_LEVEL) != 0)
9992
{
100-
pg_log_error("could not set zstd compression level to %d: %s",
101-
compresslevel, ZSTD_getErrorName(ret));
102-
exit(1);
93+
ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
94+
compress->level);
95+
if (ZSTD_isError(ret))
96+
{
97+
pg_log_error("could not set zstd compression level to %d: %s",
98+
compress->level, ZSTD_getErrorName(ret));
99+
exit(1);
100+
}
101+
}
102+
103+
/* Set # of workers, if specified */
104+
if ((compress->options & BACKUP_COMPRESSION_OPTION_WORKERS) != 0)
105+
{
106+
/*
107+
* On older versions of libzstd, this option does not exist, and
108+
* trying to set it will fail. Similarly for newer versions if they
109+
* are compiled without threading support.
110+
*/
111+
ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers,
112+
compress->workers);
113+
if (ZSTD_isError(ret))
114+
{
115+
pg_log_error("could not set compression worker count to %d: %s",
116+
compress->workers, ZSTD_getErrorName(ret));
117+
exit(1);
118+
}
103119
}
104120

105121
/* Initialize the ZSTD output buffer. */

src/bin/pg_basebackup/t/010_pg_basebackup.pl

+5
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,11 @@
130130
'invalid compression specification: found empty string where a compression option was expected',
131131
'failure on extra, empty compression option'
132132
],
133+
[
134+
'gzip:workers=3',
135+
'invalid compression specification: compression algorithm "gzip" does not accept a worker count',
136+
'failure on worker count for gzip'
137+
],
133138
);
134139
for my $cft (@compression_failure_tests)
135140
{

src/bin/pg_verifybackup/t/009_extract.pl

+27-2
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@
3434
'compression_method' => 'zstd',
3535
'backup_flags' => ['--compress', 'server-zstd:5'],
3636
'enabled' => check_pg_config("#define USE_ZSTD 1")
37+
},
38+
{
39+
'compression_method' => 'parallel zstd',
40+
'backup_flags' => ['--compress', 'server-zstd:workers=3'],
41+
'enabled' => check_pg_config("#define USE_ZSTD 1"),
42+
'possibly_unsupported' => qr/could not set compression worker count to 3: Unsupported parameter/
3743
}
3844
);
3945

@@ -55,8 +61,27 @@
5561
my @verify = ('pg_verifybackup', '-e', $backup_path);
5662

5763
# A backup with a valid compression method should work.
58-
$primary->command_ok(\@backup,
59-
"backup done, compression method \"$method\"");
64+
my $backup_stdout = '';
65+
my $backup_stderr = '';
66+
my $backup_result = $primary->run_log(\@backup, '>', \$backup_stdout,
67+
'2>', \$backup_stderr);
68+
if ($backup_stdout ne '')
69+
{
70+
print "# standard output was:\n$backup_stdout";
71+
}
72+
if ($backup_stderr ne '')
73+
{
74+
print "# standard error was:\n$backup_stderr";
75+
}
76+
if (! $backup_result && $tc->{'possibly_unsupported'} &&
77+
$backup_stderr =~ /$tc->{'possibly_unsupported'}/)
78+
{
79+
skip "compression with $method not supported by this build", 2;
80+
}
81+
else
82+
{
83+
ok($backup_result, "backup done, compression $method");
84+
}
6085

6186
# Make sure that it verifies OK.
6287
$primary->command_ok(\@verify,

src/bin/pg_verifybackup/t/010_client_untar.pl

+30-3
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,15 @@
4949
'decompress_program' => $ENV{'ZSTD'},
5050
'decompress_flags' => [ '-d' ],
5151
'enabled' => check_pg_config("#define USE_ZSTD 1")
52+
},
53+
{
54+
'compression_method' => 'parallel zstd',
55+
'backup_flags' => ['--compress', 'client-zstd:workers=3'],
56+
'backup_archive' => 'base.tar.zst',
57+
'decompress_program' => $ENV{'ZSTD'},
58+
'decompress_flags' => [ '-d' ],
59+
'enabled' => check_pg_config("#define USE_ZSTD 1"),
60+
'possibly_unsupported' => qr/could not set compression worker count to 3: Unsupported parameter/
5261
}
5362
);
5463

@@ -69,9 +78,27 @@
6978
'pg_basebackup', '-D', $backup_path,
7079
'-Xfetch', '--no-sync', '-cfast', '-Ft');
7180
push @backup, @{$tc->{'backup_flags'}};
72-
$primary->command_ok(\@backup,
73-
"client side backup, compression $method");
74-
81+
my $backup_stdout = '';
82+
my $backup_stderr = '';
83+
my $backup_result = $primary->run_log(\@backup, '>', \$backup_stdout,
84+
'2>', \$backup_stderr);
85+
if ($backup_stdout ne '')
86+
{
87+
print "# standard output was:\n$backup_stdout";
88+
}
89+
if ($backup_stderr ne '')
90+
{
91+
print "# standard error was:\n$backup_stderr";
92+
}
93+
if (! $backup_result && $tc->{'possibly_unsupported'} &&
94+
$backup_stderr =~ /$tc->{'possibly_unsupported'}/)
95+
{
96+
skip "compression with $method not supported by this build", 3;
97+
}
98+
else
99+
{
100+
ok($backup_result, "client side backup, compression $method");
101+
}
75102

76103
# Verify that the we got the files we expected.
77104
my $backup_files = join(',',

src/common/backup_compression.c

+16
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,11 @@ parse_bc_specification(bc_algorithm algorithm, char *specification,
177177
result->level = expect_integer_value(keyword, value, result);
178178
result->options |= BACKUP_COMPRESSION_OPTION_LEVEL;
179179
}
180+
else if (strcmp(keyword, "workers") == 0)
181+
{
182+
result->workers = expect_integer_value(keyword, value, result);
183+
result->options |= BACKUP_COMPRESSION_OPTION_WORKERS;
184+
}
180185
else
181186
result->parse_error =
182187
psprintf(_("unknown compression option \"%s\""), keyword);
@@ -266,5 +271,16 @@ validate_bc_specification(bc_specification *spec)
266271
min_level, max_level);
267272
}
268273

274+
/*
275+
* Of the compression algorithms that we currently support, only zstd
276+
* allows parallel workers.
277+
*/
278+
if ((spec->options & BACKUP_COMPRESSION_OPTION_WORKERS) != 0 &&
279+
(spec->algorithm != BACKUP_COMPRESSION_ZSTD))
280+
{
281+
return psprintf(_("compression algorithm \"%s\" does not accept a worker count"),
282+
get_bc_algorithm_name(spec->algorithm));
283+
}
284+
269285
return NULL;
270286
}

src/include/common/backup_compression.h

+2
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,14 @@ typedef enum bc_algorithm
2323
} bc_algorithm;
2424

2525
#define BACKUP_COMPRESSION_OPTION_LEVEL (1 << 0)
26+
#define BACKUP_COMPRESSION_OPTION_WORKERS (1 << 1)
2627

2728
typedef struct bc_specification
2829
{
2930
bc_algorithm algorithm;
3031
unsigned options; /* OR of BACKUP_COMPRESSION_OPTION constants */
3132
int level;
33+
int workers;
3234
char *parse_error; /* NULL if parsing was OK, else message */
3335
} bc_specification;
3436

0 commit comments

Comments
 (0)