Skip to content

Commit dab2984

Browse files
committed
Add suport for server-side LZ4 base backup compression.
LZ4 compression can be a lot faster than gzip compression, so users may prefer it even if the compression ratio is not as good. We will want pg_basebackup to support LZ4 compression and decompression on the client side as well, and there is a pending patch for that, but it's by a different author, so I am committing this part separately for that reason. Jeevan Ladhe, reviewed by Tushar Ahuja and by me. Discussion: http://postgr.es/m/CANm22Cg9cArXEaYgHVZhCnzPLfqXCZLAzjwTq7Fc0quXRPfbxA@mail.gmail.com
1 parent a745b93 commit dab2984

File tree

9 files changed

+349
-18
lines changed

9 files changed

+349
-18
lines changed

doc/src/sgml/protocol.sgml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2724,8 +2724,8 @@ The commands accepted in replication mode are:
27242724
<listitem>
27252725
<para>
27262726
Instructs the server to compress the backup using the specified
2727-
method. Currently, the only supported method is
2728-
<literal>gzip</literal>.
2727+
method. Currently, the supported methods are <literal>gzip</literal>
2728+
and <literal>lz4</literal>.
27292729
</para>
27302730
</listitem>
27312731
</varlistentry>
@@ -2736,7 +2736,8 @@ The commands accepted in replication mode are:
27362736
<para>
27372737
Specifies the compression level to be used. This should only be
27382738
used in conjunction with the <literal>COMPRESSION</literal> option.
2739-
The value should be an integer between 1 and 9.
2739+
For <literal>gzip</literal> the value should be an integer between 1
2740+
and 9, and for <literal>lz4</literal> it should be between 1 and 12.
27402741
</para>
27412742
</listitem>
27422743
</varlistentry>

doc/src/sgml/ref/pg_basebackup.sgml

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -417,23 +417,27 @@ PostgreSQL documentation
417417
specify <literal>-Xfetch</literal>.
418418
</para>
419419
<para>
420-
The compression method can be set to either <literal>gzip</literal>
421-
for compression with <application>gzip</application>, or
422-
<literal>none</literal> for no compression. A compression level
423-
can be optionally specified, by appending the level number after a
420+
The compression method can be set to <literal>gzip</literal> for
421+
compression with <application>gzip</application>, or
422+
<literal>lz4</literal> for compression with
423+
<application>lz4</application>, or <literal>none</literal> for no
424+
compression. However, <literal>lz4</literal> can be currently only
425+
used with <literal>server</literal>. A compression level can be
426+
optionally specified, by appending the level number after a
424427
colon (<literal>:</literal>). If no level is specified, the default
425428
compression level will be used. If only a level is specified without
426429
mentioning an algorithm, <literal>gzip</literal> compression will
427430
be used if the level is greater than 0, and no compression will be
428431
used if the level is 0.
429432
</para>
430433
<para>
431-
When the tar format is used, the suffix <filename>.gz</filename> will
432-
automatically be added to all tar filenames. When the plain format is
433-
used, client-side compression may not be specified, but it is
434-
still possible to request server-side compression. If this is done,
435-
the server will compress the backup for transmission, and the
436-
client will decompress and extract it.
434+
When the tar format is used with <literal>gzip</literal> or
435+
<literal>lz4</literal>, the suffix <filename>.gz</filename> or
436+
<filename>.lz4</filename> will automatically be added to all tar
437+
filenames. When the plain format is used, client-side compression may
438+
not be specified, but it is still possible to request server-side
439+
compression. If this is done, the server will compress the backup for
440+
transmission, and the client will decompress and extract it.
437441
</para>
438442
</listitem>
439443
</varlistentry>

src/backend/replication/Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ OBJS = \
1919
basebackup.o \
2020
basebackup_copy.o \
2121
basebackup_gzip.o \
22+
basebackup_lz4.o \
2223
basebackup_progress.o \
2324
basebackup_server.o \
2425
basebackup_sink.o \

src/backend/replication/basebackup.c

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ typedef enum
6363
typedef enum
6464
{
6565
BACKUP_COMPRESSION_NONE,
66-
BACKUP_COMPRESSION_GZIP
66+
BACKUP_COMPRESSION_GZIP,
67+
BACKUP_COMPRESSION_LZ4
6768
} basebackup_compression_type;
6869

6970
typedef struct
@@ -903,6 +904,8 @@ parse_basebackup_options(List *options, basebackup_options *opt)
903904
opt->compression = BACKUP_COMPRESSION_NONE;
904905
else if (strcmp(optval, "gzip") == 0)
905906
opt->compression = BACKUP_COMPRESSION_GZIP;
907+
else if (strcmp(optval, "lz4") == 0)
908+
opt->compression = BACKUP_COMPRESSION_LZ4;
906909
else
907910
ereport(ERROR,
908911
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -1021,6 +1024,8 @@ SendBaseBackup(BaseBackupCmd *cmd)
10211024
/* Set up server-side compression, if client requested it */
10221025
if (opt.compression == BACKUP_COMPRESSION_GZIP)
10231026
sink = bbsink_gzip_new(sink, opt.compression_level);
1027+
else if (opt.compression == BACKUP_COMPRESSION_LZ4)
1028+
sink = bbsink_lz4_new(sink, opt.compression_level);
10241029

10251030
/* Set up progress reporting. */
10261031
sink = bbsink_progress_new(sink, opt.progress);
Lines changed: 298 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,298 @@
1+
/*-------------------------------------------------------------------------
2+
*
3+
* basebackup_lz4.c
4+
* Basebackup sink implementing lz4 compression.
5+
*
6+
* Portions Copyright (c) 2010-2020, PostgreSQL Global Development Group
7+
*
8+
* IDENTIFICATION
9+
* src/backend/replication/basebackup_lz4.c
10+
*
11+
*-------------------------------------------------------------------------
12+
*/
13+
#include "postgres.h"
14+
15+
#ifdef HAVE_LIBLZ4
16+
#include <lz4frame.h>
17+
#endif
18+
19+
#include "replication/basebackup_sink.h"
20+
21+
#ifdef HAVE_LIBLZ4
22+
23+
typedef struct bbsink_lz4
24+
{
25+
/* Common information for all types of sink. */
26+
bbsink base;
27+
28+
/* Compression level. */
29+
int compresslevel;
30+
31+
LZ4F_compressionContext_t ctx;
32+
LZ4F_preferences_t prefs;
33+
34+
/* Number of bytes staged in output buffer. */
35+
size_t bytes_written;
36+
} bbsink_lz4;
37+
38+
static void bbsink_lz4_begin_backup(bbsink *sink);
39+
static void bbsink_lz4_begin_archive(bbsink *sink, const char *archive_name);
40+
static void bbsink_lz4_archive_contents(bbsink *sink, size_t avail_in);
41+
static void bbsink_lz4_manifest_contents(bbsink *sink, size_t len);
42+
static void bbsink_lz4_end_archive(bbsink *sink);
43+
static void bbsink_lz4_cleanup(bbsink *sink);
44+
45+
const bbsink_ops bbsink_lz4_ops = {
46+
.begin_backup = bbsink_lz4_begin_backup,
47+
.begin_archive = bbsink_lz4_begin_archive,
48+
.archive_contents = bbsink_lz4_archive_contents,
49+
.end_archive = bbsink_lz4_end_archive,
50+
.begin_manifest = bbsink_forward_begin_manifest,
51+
.manifest_contents = bbsink_lz4_manifest_contents,
52+
.end_manifest = bbsink_forward_end_manifest,
53+
.end_backup = bbsink_forward_end_backup,
54+
.cleanup = bbsink_lz4_cleanup
55+
};
56+
#endif
57+
58+
/*
59+
* Create a new basebackup sink that performs lz4 compression using the
60+
* designated compression level.
61+
*/
62+
bbsink *
63+
bbsink_lz4_new(bbsink *next, int compresslevel)
64+
{
65+
#ifndef HAVE_LIBLZ4
66+
ereport(ERROR,
67+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
68+
errmsg("lz4 compression is not supported by this build")));
69+
#else
70+
bbsink_lz4 *sink;
71+
72+
Assert(next != NULL);
73+
74+
if (compresslevel < 0 || compresslevel > 12)
75+
ereport(ERROR,
76+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
77+
errmsg("lz4 compression level %d is out of range",
78+
compresslevel)));
79+
80+
sink = palloc0(sizeof(bbsink_lz4));
81+
*((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_lz4_ops;
82+
sink->base.bbs_next = next;
83+
sink->compresslevel = compresslevel;
84+
85+
return &sink->base;
86+
#endif
87+
}
88+
89+
#ifdef HAVE_LIBLZ4
90+
91+
/*
92+
* Begin backup.
93+
*/
94+
static void
95+
bbsink_lz4_begin_backup(bbsink *sink)
96+
{
97+
bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
98+
size_t output_buffer_bound;
99+
LZ4F_preferences_t *prefs = &mysink->prefs;
100+
101+
/* Initialize compressor object. */
102+
memset(prefs, 0, sizeof(LZ4F_preferences_t));
103+
prefs->frameInfo.blockSizeID = LZ4F_max256KB;
104+
prefs->compressionLevel = mysink->compresslevel;
105+
106+
/*
107+
* We need our own buffer, because we're going to pass different data to
108+
* the next sink than what gets passed to us.
109+
*/
110+
mysink->base.bbs_buffer = palloc(mysink->base.bbs_buffer_length);
111+
112+
/*
113+
* Since LZ4F_compressUpdate() requires the output buffer of size equal or
114+
* greater than that of LZ4F_compressBound(), make sure we have the next
115+
* sink's bbs_buffer of length that can accommodate the compressed input
116+
* buffer.
117+
*/
118+
output_buffer_bound = LZ4F_compressBound(mysink->base.bbs_buffer_length,
119+
&mysink->prefs);
120+
121+
/*
122+
* The buffer length is expected to be a multiple of BLCKSZ, so round up.
123+
*/
124+
output_buffer_bound = output_buffer_bound + BLCKSZ -
125+
(output_buffer_bound % BLCKSZ);
126+
127+
bbsink_begin_backup(sink->bbs_next, sink->bbs_state, output_buffer_bound);
128+
}
129+
130+
/*
131+
* Prepare to compress the next archive.
132+
*/
133+
static void
134+
bbsink_lz4_begin_archive(bbsink *sink, const char *archive_name)
135+
{
136+
bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
137+
char *lz4_archive_name;
138+
LZ4F_errorCode_t ctxError;
139+
size_t headerSize;
140+
141+
ctxError = LZ4F_createCompressionContext(&mysink->ctx, LZ4F_VERSION);
142+
if (LZ4F_isError(ctxError))
143+
elog(ERROR, "could not create lz4 compression context: %s",
144+
LZ4F_getErrorName(ctxError));
145+
146+
/* First of all write the frame header to destination buffer. */
147+
headerSize = LZ4F_compressBegin(mysink->ctx,
148+
mysink->base.bbs_next->bbs_buffer,
149+
mysink->base.bbs_next->bbs_buffer_length,
150+
&mysink->prefs);
151+
152+
if (LZ4F_isError(headerSize))
153+
elog(ERROR, "could not write lz4 header: %s",
154+
LZ4F_getErrorName(headerSize));
155+
156+
/*
157+
* We need to write the compressed data after the header in the output
158+
* buffer. So, make sure to update the notion of bytes written to output
159+
* buffer.
160+
*/
161+
mysink->bytes_written += headerSize;
162+
163+
/* Add ".lz4" to the archive name. */
164+
lz4_archive_name = psprintf("%s.lz4", archive_name);
165+
Assert(sink->bbs_next != NULL);
166+
bbsink_begin_archive(sink->bbs_next, lz4_archive_name);
167+
pfree(lz4_archive_name);
168+
}
169+
170+
/*
171+
* Compress the input data to the output buffer until we run out of input
172+
* data. Each time the output buffer falls below the compression bound for
173+
* the input buffer, invoke the archive_contents() method for then next sink.
174+
*
175+
* Note that since we're compressing the input, it may very commonly happen
176+
* that we consume all the input data without filling the output buffer. In
177+
* that case, the compressed representation of the current input data won't
178+
* actually be sent to the next bbsink until a later call to this function,
179+
* or perhaps even not until bbsink_lz4_end_archive() is invoked.
180+
*/
181+
static void
182+
bbsink_lz4_archive_contents(bbsink *sink, size_t avail_in)
183+
{
184+
bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
185+
size_t compressedSize;
186+
size_t avail_in_bound;
187+
188+
avail_in_bound = LZ4F_compressBound(avail_in, &mysink->prefs);
189+
190+
/*
191+
* If the number of available bytes has fallen below the value computed by
192+
* LZ4F_compressBound(), ask the next sink to process the data so that we
193+
* can empty the buffer.
194+
*/
195+
if ((mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written) <=
196+
avail_in_bound)
197+
{
198+
bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
199+
mysink->bytes_written = 0;
200+
}
201+
202+
/*
203+
* Compress the input buffer and write it into the output buffer.
204+
*/
205+
compressedSize = LZ4F_compressUpdate(mysink->ctx,
206+
mysink->base.bbs_next->bbs_buffer + mysink->bytes_written,
207+
mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written,
208+
(uint8 *) mysink->base.bbs_buffer,
209+
avail_in,
210+
NULL);
211+
212+
if (LZ4F_isError(compressedSize))
213+
elog(ERROR, "could not compress data: %s",
214+
LZ4F_getErrorName(compressedSize));
215+
216+
/*
217+
* Update our notion of how many bytes we've written into output buffer.
218+
*/
219+
mysink->bytes_written += compressedSize;
220+
}
221+
222+
/*
223+
* There might be some data inside lz4's internal buffers; we need to get
224+
* that flushed out and also finalize the lz4 frame and then get that forwarded
225+
* to the successor sink as archive content.
226+
*
227+
* Then we can end processing for this archive.
228+
*/
229+
static void
230+
bbsink_lz4_end_archive(bbsink *sink)
231+
{
232+
bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
233+
size_t compressedSize;
234+
size_t lz4_footer_bound;
235+
236+
lz4_footer_bound = LZ4F_compressBound(0, &mysink->prefs);
237+
238+
Assert(mysink->base.bbs_next->bbs_buffer_length >= lz4_footer_bound);
239+
240+
if ((mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written) <=
241+
lz4_footer_bound)
242+
{
243+
bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
244+
mysink->bytes_written = 0;
245+
}
246+
247+
compressedSize = LZ4F_compressEnd(mysink->ctx,
248+
mysink->base.bbs_next->bbs_buffer + mysink->bytes_written,
249+
mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written,
250+
NULL);
251+
252+
if (LZ4F_isError(compressedSize))
253+
elog(ERROR, "could not end lz4 compression: %s",
254+
LZ4F_getErrorName(compressedSize));
255+
256+
/* Update our notion of how many bytes we've written. */
257+
mysink->bytes_written += compressedSize;
258+
259+
/* Send whatever accumulated output bytes we have. */
260+
bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
261+
mysink->bytes_written = 0;
262+
263+
/* Release the resources. */
264+
LZ4F_freeCompressionContext(mysink->ctx);
265+
mysink->ctx = NULL;
266+
267+
/* Pass on the information that this archive has ended. */
268+
bbsink_forward_end_archive(sink);
269+
}
270+
271+
/*
272+
* Manifest contents are not compressed, but we do need to copy them into
273+
* the successor sink's buffer, because we have our own.
274+
*/
275+
static void
276+
bbsink_lz4_manifest_contents(bbsink *sink, size_t len)
277+
{
278+
memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len);
279+
bbsink_manifest_contents(sink->bbs_next, len);
280+
}
281+
282+
/*
283+
* In case the backup fails, make sure we free the compression context by
284+
* calling LZ4F_freeCompressionContext() if needed to avoid memory leak.
285+
*/
286+
static void
287+
bbsink_lz4_cleanup(bbsink *sink)
288+
{
289+
bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
290+
291+
if (mysink->ctx)
292+
{
293+
LZ4F_freeCompressionContext(mysink->ctx);
294+
mysink->ctx = NULL;
295+
}
296+
}
297+
298+
#endif

0 commit comments

Comments
 (0)