|
| 1 | +/*------------------------------------------------------------------------- |
| 2 | + * |
| 3 | + * basebackup_lz4.c |
| 4 | + * Basebackup sink implementing lz4 compression. |
| 5 | + * |
| 6 | + * Portions Copyright (c) 2010-2020, PostgreSQL Global Development Group |
| 7 | + * |
| 8 | + * IDENTIFICATION |
| 9 | + * src/backend/replication/basebackup_lz4.c |
| 10 | + * |
| 11 | + *------------------------------------------------------------------------- |
| 12 | + */ |
| 13 | +#include "postgres.h" |
| 14 | + |
| 15 | +#ifdef HAVE_LIBLZ4 |
| 16 | +#include <lz4frame.h> |
| 17 | +#endif |
| 18 | + |
| 19 | +#include "replication/basebackup_sink.h" |
| 20 | + |
| 21 | +#ifdef HAVE_LIBLZ4 |
| 22 | + |
| 23 | +typedef struct bbsink_lz4 |
| 24 | +{ |
| 25 | + /* Common information for all types of sink. */ |
| 26 | + bbsink base; |
| 27 | + |
| 28 | + /* Compression level. */ |
| 29 | + int compresslevel; |
| 30 | + |
| 31 | + LZ4F_compressionContext_t ctx; |
| 32 | + LZ4F_preferences_t prefs; |
| 33 | + |
| 34 | + /* Number of bytes staged in output buffer. */ |
| 35 | + size_t bytes_written; |
| 36 | +} bbsink_lz4; |
| 37 | + |
| 38 | +static void bbsink_lz4_begin_backup(bbsink *sink); |
| 39 | +static void bbsink_lz4_begin_archive(bbsink *sink, const char *archive_name); |
| 40 | +static void bbsink_lz4_archive_contents(bbsink *sink, size_t avail_in); |
| 41 | +static void bbsink_lz4_manifest_contents(bbsink *sink, size_t len); |
| 42 | +static void bbsink_lz4_end_archive(bbsink *sink); |
| 43 | +static void bbsink_lz4_cleanup(bbsink *sink); |
| 44 | + |
| 45 | +const bbsink_ops bbsink_lz4_ops = { |
| 46 | + .begin_backup = bbsink_lz4_begin_backup, |
| 47 | + .begin_archive = bbsink_lz4_begin_archive, |
| 48 | + .archive_contents = bbsink_lz4_archive_contents, |
| 49 | + .end_archive = bbsink_lz4_end_archive, |
| 50 | + .begin_manifest = bbsink_forward_begin_manifest, |
| 51 | + .manifest_contents = bbsink_lz4_manifest_contents, |
| 52 | + .end_manifest = bbsink_forward_end_manifest, |
| 53 | + .end_backup = bbsink_forward_end_backup, |
| 54 | + .cleanup = bbsink_lz4_cleanup |
| 55 | +}; |
| 56 | +#endif |
| 57 | + |
| 58 | +/* |
| 59 | + * Create a new basebackup sink that performs lz4 compression using the |
| 60 | + * designated compression level. |
| 61 | + */ |
| 62 | +bbsink * |
| 63 | +bbsink_lz4_new(bbsink *next, int compresslevel) |
| 64 | +{ |
| 65 | +#ifndef HAVE_LIBLZ4 |
| 66 | + ereport(ERROR, |
| 67 | + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| 68 | + errmsg("lz4 compression is not supported by this build"))); |
| 69 | +#else |
| 70 | + bbsink_lz4 *sink; |
| 71 | + |
| 72 | + Assert(next != NULL); |
| 73 | + |
| 74 | + if (compresslevel < 0 || compresslevel > 12) |
| 75 | + ereport(ERROR, |
| 76 | + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| 77 | + errmsg("lz4 compression level %d is out of range", |
| 78 | + compresslevel))); |
| 79 | + |
| 80 | + sink = palloc0(sizeof(bbsink_lz4)); |
| 81 | + *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_lz4_ops; |
| 82 | + sink->base.bbs_next = next; |
| 83 | + sink->compresslevel = compresslevel; |
| 84 | + |
| 85 | + return &sink->base; |
| 86 | +#endif |
| 87 | +} |
| 88 | + |
| 89 | +#ifdef HAVE_LIBLZ4 |
| 90 | + |
| 91 | +/* |
| 92 | + * Begin backup. |
| 93 | + */ |
| 94 | +static void |
| 95 | +bbsink_lz4_begin_backup(bbsink *sink) |
| 96 | +{ |
| 97 | + bbsink_lz4 *mysink = (bbsink_lz4 *) sink; |
| 98 | + size_t output_buffer_bound; |
| 99 | + LZ4F_preferences_t *prefs = &mysink->prefs; |
| 100 | + |
| 101 | + /* Initialize compressor object. */ |
| 102 | + memset(prefs, 0, sizeof(LZ4F_preferences_t)); |
| 103 | + prefs->frameInfo.blockSizeID = LZ4F_max256KB; |
| 104 | + prefs->compressionLevel = mysink->compresslevel; |
| 105 | + |
| 106 | + /* |
| 107 | + * We need our own buffer, because we're going to pass different data to |
| 108 | + * the next sink than what gets passed to us. |
| 109 | + */ |
| 110 | + mysink->base.bbs_buffer = palloc(mysink->base.bbs_buffer_length); |
| 111 | + |
| 112 | + /* |
| 113 | + * Since LZ4F_compressUpdate() requires the output buffer of size equal or |
| 114 | + * greater than that of LZ4F_compressBound(), make sure we have the next |
| 115 | + * sink's bbs_buffer of length that can accommodate the compressed input |
| 116 | + * buffer. |
| 117 | + */ |
| 118 | + output_buffer_bound = LZ4F_compressBound(mysink->base.bbs_buffer_length, |
| 119 | + &mysink->prefs); |
| 120 | + |
| 121 | + /* |
| 122 | + * The buffer length is expected to be a multiple of BLCKSZ, so round up. |
| 123 | + */ |
| 124 | + output_buffer_bound = output_buffer_bound + BLCKSZ - |
| 125 | + (output_buffer_bound % BLCKSZ); |
| 126 | + |
| 127 | + bbsink_begin_backup(sink->bbs_next, sink->bbs_state, output_buffer_bound); |
| 128 | +} |
| 129 | + |
| 130 | +/* |
| 131 | + * Prepare to compress the next archive. |
| 132 | + */ |
| 133 | +static void |
| 134 | +bbsink_lz4_begin_archive(bbsink *sink, const char *archive_name) |
| 135 | +{ |
| 136 | + bbsink_lz4 *mysink = (bbsink_lz4 *) sink; |
| 137 | + char *lz4_archive_name; |
| 138 | + LZ4F_errorCode_t ctxError; |
| 139 | + size_t headerSize; |
| 140 | + |
| 141 | + ctxError = LZ4F_createCompressionContext(&mysink->ctx, LZ4F_VERSION); |
| 142 | + if (LZ4F_isError(ctxError)) |
| 143 | + elog(ERROR, "could not create lz4 compression context: %s", |
| 144 | + LZ4F_getErrorName(ctxError)); |
| 145 | + |
| 146 | + /* First of all write the frame header to destination buffer. */ |
| 147 | + headerSize = LZ4F_compressBegin(mysink->ctx, |
| 148 | + mysink->base.bbs_next->bbs_buffer, |
| 149 | + mysink->base.bbs_next->bbs_buffer_length, |
| 150 | + &mysink->prefs); |
| 151 | + |
| 152 | + if (LZ4F_isError(headerSize)) |
| 153 | + elog(ERROR, "could not write lz4 header: %s", |
| 154 | + LZ4F_getErrorName(headerSize)); |
| 155 | + |
| 156 | + /* |
| 157 | + * We need to write the compressed data after the header in the output |
| 158 | + * buffer. So, make sure to update the notion of bytes written to output |
| 159 | + * buffer. |
| 160 | + */ |
| 161 | + mysink->bytes_written += headerSize; |
| 162 | + |
| 163 | + /* Add ".lz4" to the archive name. */ |
| 164 | + lz4_archive_name = psprintf("%s.lz4", archive_name); |
| 165 | + Assert(sink->bbs_next != NULL); |
| 166 | + bbsink_begin_archive(sink->bbs_next, lz4_archive_name); |
| 167 | + pfree(lz4_archive_name); |
| 168 | +} |
| 169 | + |
| 170 | +/* |
| 171 | + * Compress the input data to the output buffer until we run out of input |
| 172 | + * data. Each time the output buffer falls below the compression bound for |
| 173 | + * the input buffer, invoke the archive_contents() method for then next sink. |
| 174 | + * |
| 175 | + * Note that since we're compressing the input, it may very commonly happen |
| 176 | + * that we consume all the input data without filling the output buffer. In |
| 177 | + * that case, the compressed representation of the current input data won't |
| 178 | + * actually be sent to the next bbsink until a later call to this function, |
| 179 | + * or perhaps even not until bbsink_lz4_end_archive() is invoked. |
| 180 | + */ |
| 181 | +static void |
| 182 | +bbsink_lz4_archive_contents(bbsink *sink, size_t avail_in) |
| 183 | +{ |
| 184 | + bbsink_lz4 *mysink = (bbsink_lz4 *) sink; |
| 185 | + size_t compressedSize; |
| 186 | + size_t avail_in_bound; |
| 187 | + |
| 188 | + avail_in_bound = LZ4F_compressBound(avail_in, &mysink->prefs); |
| 189 | + |
| 190 | + /* |
| 191 | + * If the number of available bytes has fallen below the value computed by |
| 192 | + * LZ4F_compressBound(), ask the next sink to process the data so that we |
| 193 | + * can empty the buffer. |
| 194 | + */ |
| 195 | + if ((mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written) <= |
| 196 | + avail_in_bound) |
| 197 | + { |
| 198 | + bbsink_archive_contents(sink->bbs_next, mysink->bytes_written); |
| 199 | + mysink->bytes_written = 0; |
| 200 | + } |
| 201 | + |
| 202 | + /* |
| 203 | + * Compress the input buffer and write it into the output buffer. |
| 204 | + */ |
| 205 | + compressedSize = LZ4F_compressUpdate(mysink->ctx, |
| 206 | + mysink->base.bbs_next->bbs_buffer + mysink->bytes_written, |
| 207 | + mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written, |
| 208 | + (uint8 *) mysink->base.bbs_buffer, |
| 209 | + avail_in, |
| 210 | + NULL); |
| 211 | + |
| 212 | + if (LZ4F_isError(compressedSize)) |
| 213 | + elog(ERROR, "could not compress data: %s", |
| 214 | + LZ4F_getErrorName(compressedSize)); |
| 215 | + |
| 216 | + /* |
| 217 | + * Update our notion of how many bytes we've written into output buffer. |
| 218 | + */ |
| 219 | + mysink->bytes_written += compressedSize; |
| 220 | +} |
| 221 | + |
| 222 | +/* |
| 223 | + * There might be some data inside lz4's internal buffers; we need to get |
| 224 | + * that flushed out and also finalize the lz4 frame and then get that forwarded |
| 225 | + * to the successor sink as archive content. |
| 226 | + * |
| 227 | + * Then we can end processing for this archive. |
| 228 | + */ |
| 229 | +static void |
| 230 | +bbsink_lz4_end_archive(bbsink *sink) |
| 231 | +{ |
| 232 | + bbsink_lz4 *mysink = (bbsink_lz4 *) sink; |
| 233 | + size_t compressedSize; |
| 234 | + size_t lz4_footer_bound; |
| 235 | + |
| 236 | + lz4_footer_bound = LZ4F_compressBound(0, &mysink->prefs); |
| 237 | + |
| 238 | + Assert(mysink->base.bbs_next->bbs_buffer_length >= lz4_footer_bound); |
| 239 | + |
| 240 | + if ((mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written) <= |
| 241 | + lz4_footer_bound) |
| 242 | + { |
| 243 | + bbsink_archive_contents(sink->bbs_next, mysink->bytes_written); |
| 244 | + mysink->bytes_written = 0; |
| 245 | + } |
| 246 | + |
| 247 | + compressedSize = LZ4F_compressEnd(mysink->ctx, |
| 248 | + mysink->base.bbs_next->bbs_buffer + mysink->bytes_written, |
| 249 | + mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written, |
| 250 | + NULL); |
| 251 | + |
| 252 | + if (LZ4F_isError(compressedSize)) |
| 253 | + elog(ERROR, "could not end lz4 compression: %s", |
| 254 | + LZ4F_getErrorName(compressedSize)); |
| 255 | + |
| 256 | + /* Update our notion of how many bytes we've written. */ |
| 257 | + mysink->bytes_written += compressedSize; |
| 258 | + |
| 259 | + /* Send whatever accumulated output bytes we have. */ |
| 260 | + bbsink_archive_contents(sink->bbs_next, mysink->bytes_written); |
| 261 | + mysink->bytes_written = 0; |
| 262 | + |
| 263 | + /* Release the resources. */ |
| 264 | + LZ4F_freeCompressionContext(mysink->ctx); |
| 265 | + mysink->ctx = NULL; |
| 266 | + |
| 267 | + /* Pass on the information that this archive has ended. */ |
| 268 | + bbsink_forward_end_archive(sink); |
| 269 | +} |
| 270 | + |
| 271 | +/* |
| 272 | + * Manifest contents are not compressed, but we do need to copy them into |
| 273 | + * the successor sink's buffer, because we have our own. |
| 274 | + */ |
| 275 | +static void |
| 276 | +bbsink_lz4_manifest_contents(bbsink *sink, size_t len) |
| 277 | +{ |
| 278 | + memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len); |
| 279 | + bbsink_manifest_contents(sink->bbs_next, len); |
| 280 | +} |
| 281 | + |
| 282 | +/* |
| 283 | + * In case the backup fails, make sure we free the compression context by |
| 284 | + * calling LZ4F_freeCompressionContext() if needed to avoid memory leak. |
| 285 | + */ |
| 286 | +static void |
| 287 | +bbsink_lz4_cleanup(bbsink *sink) |
| 288 | +{ |
| 289 | + bbsink_lz4 *mysink = (bbsink_lz4 *) sink; |
| 290 | + |
| 291 | + if (mysink->ctx) |
| 292 | + { |
| 293 | + LZ4F_freeCompressionContext(mysink->ctx); |
| 294 | + mysink->ctx = NULL; |
| 295 | + } |
| 296 | +} |
| 297 | + |
| 298 | +#endif |
0 commit comments