11
11
*
12
12
* NOTES:
13
13
*
14
- * This file implemets compression of file pages.
14
+ * This file implements compression of file pages.
15
15
* Updated compressed pages are always appended to the end of file segment.
16
- * Garbage collector is used to shrink files when them become tool large.
17
- * GC is spawned as one or more background workers. Them recursively traverse all tablespace directories,
18
- * find out *.cfm files are if logical size of the file is twice larger than physical size of the file
19
- * performs compactification. Locking implemented using atomic operations is used to eliminate race
20
- * conditions.
16
+ * Garbage collector is used to reclaim storage occupied by outdated versions of pages.
17
+ * GC runs one or more background workers which recursively traverse all tablespace
18
+ * directories. If worker finds out that logical size of the file is twice as large as
19
+ * physical size of the file, it performs compactification.
20
+ * To eliminate race conditions, files are locked during compacification.
21
+ * Locks are implemented with atomic operations.
22
+ *
23
+ * TODO Write a note about *.cfm files.
21
24
*/
22
25
23
26
#include "postgres.h"
51
54
#include "utils/resowner_private.h"
52
55
#include "postmaster/bgworker.h"
53
56
57
+ /*
58
+ * GUC variable that defines compression level.
59
+ * 0 - no compression, 1 - max speed,
60
+ * other possible values depend on the specific algorithm.
61
+ * Default value is 1.
62
+ */
63
+ int cfs_level ;
64
+ /*
65
+ * GUC variable that defines if encryption of compressed pages is enabled.
66
+ * Default value is false.
67
+ */
68
+ bool cfs_encryption ;
69
+ /*
70
+ * GUC variable - Verify correctness of data written by GC.
71
+ * TODO add description and documentation.
72
+ */
73
+ bool cfs_gc_verify_file ;
74
+
75
+ /* GUC variable - Number of garbage collection background workers. Default = 1 */
54
76
int cfs_gc_workers ;
77
+ /*
78
+ * GUC variable - Specifies the minimum percent of garbage blocks
79
+ * needed to trigger a GC of the file. Default = 50
80
+ */
55
81
int cfs_gc_threshold ;
82
+ /* GUC variable - Time to sleep between GC runs in milliseconds. Default = 5000 */
56
83
int cfs_gc_period ;
84
+ /* GUC variable - Delay in milliseconds between files defragmentation. Default = 0
85
+ * TODO What is the purpose of this variable?
86
+ */
57
87
int cfs_gc_delay ;
58
- int cfs_level ;
59
- bool cfs_encryption ;
60
- bool cfs_gc_verify_file ;
61
88
62
89
static bool cfs_read_file (int fd , void * data , uint32 size );
63
90
static bool cfs_write_file (int fd , void const * data , uint32 size );
@@ -68,90 +95,109 @@ CfsState* cfs_state;
68
95
static bool cfs_stop ;
69
96
static int cfs_processed_segments ;
70
97
71
- #if CFS_COMPRESSOR == SNAPPY_COMPRESSOR
72
98
73
- #include <snappy-c.h>
99
+ /* ----------------------------------------------------------------
100
+ * Section 1: Various compression algorithms.
101
+ * CFS_COMPRESSOR variable can be set at compile time.
102
+ * One should define CFS_COMPRESSOR in cfs.h
103
+ * Availiable options are:
104
+ * - LZ_COMPRESSOR // FIXME. No actual implementation.
105
+ * - ZLIB_COMPRESSOR
106
+ * - LZ4_COMPRESSOR
107
+ * - SNAPPY_COMPRESSOR
108
+ * - LCFSE_COMPRESSOR
109
+ * - ZSTD_COMPRESSOR
110
+ *
111
+ * If none of options is chosen, use standard pglz_compress FIXME?, which is
112
+ * slow and non-efficient in comparison with others, but doesn't requre
113
+ * any extra libraries.
114
+ * ----------------------------------------------------------------
115
+ */
116
+
117
+ #if CFS_COMPRESSOR == ZLIB_COMPRESSOR
118
+
119
+ #include <zlib.h>
74
120
75
121
size_t cfs_compress (void * dst , size_t dst_size , void const * src , size_t src_size )
76
122
{
77
- return snappy_compress (src , src_size , dst , & dst_size ) == SNAPPY_OK ? dst_size : 0 ;
123
+ uLongf compressed_size = dst_size ;
124
+ int rc = compress2 (dst , & compressed_size , src , src_size , cfs_level );
125
+ return rc == Z_OK ? compressed_size : rc ;
78
126
}
79
127
80
128
size_t cfs_decompress (void * dst , size_t dst_size , void const * src , size_t src_size )
81
129
{
82
- return snappy_uncompress (src , src_size , dst , & dst_size ) == SNAPPY_OK ? dst_size : 0 ;
130
+ uLongf dest_len = dst_size ;
131
+ int rc = uncompress (dst , & dest_len , src , src_size );
132
+ return rc == Z_OK ? dest_len : rc ;
83
133
}
84
134
85
135
char const * cfs_algorithm ()
86
136
{
87
- return "snappy " ;
137
+ return "zlib " ;
88
138
}
89
139
90
- #elif CFS_COMPRESSOR == LCFSE_COMPRESSOR
140
+ #elif CFS_COMPRESSOR == LZ4_COMPRESSOR
91
141
92
- #include <lcfse .h>
142
+ #include <lz4 .h>
93
143
94
144
size_t cfs_compress (void * dst , size_t dst_size , void const * src , size_t src_size )
95
145
{
96
- char * scratch_buf = palloc (lcfse_encode_scratch_size ());
97
- size_t rc = lcfse_encode_buffer (dst , dst_size , src , src_size , scratch_buf );
98
- pfree (scratch_buf );
99
- return rc ;
146
+ return LZ4_compress (src , dst , src_size );
100
147
}
101
148
102
149
size_t cfs_decompress (void * dst , size_t dst_size , void const * src , size_t src_size )
103
150
{
104
- char * scratch_buf = palloc (lcfse_encode_scratch_size ());
105
- size_t rc = lcfse_decode_buffer (dst , dst_size , src , src_size , scratch_buf );
106
- pfree (scratch_buf );
107
- return rc ;
151
+ return LZ4_decompress_safe (src , dst , src_size , dst_size );
108
152
}
109
153
110
154
char const * cfs_algorithm ()
111
155
{
112
- return "lcfse " ;
156
+ return "lz4 " ;
113
157
}
114
158
115
- #elif CFS_COMPRESSOR == LZ4_COMPRESSOR
159
+ #elif CFS_COMPRESSOR == SNAPPY_COMPRESSOR
116
160
117
- #include <lz4 .h>
161
+ #include <snappy-c .h>
118
162
119
163
size_t cfs_compress (void * dst , size_t dst_size , void const * src , size_t src_size )
120
164
{
121
- return LZ4_compress (src , dst , src_size ) ;
165
+ return snappy_compress (src , src_size , dst , & dst_size ) == SNAPPY_OK ? dst_size : 0 ;
122
166
}
123
167
124
168
size_t cfs_decompress (void * dst , size_t dst_size , void const * src , size_t src_size )
125
169
{
126
- return LZ4_decompress_safe (src , dst , src_size , dst_size );
170
+ return snappy_uncompress (src , src_size , dst , & dst_size ) == SNAPPY_OK ? dst_size : 0 ;
127
171
}
128
172
129
173
char const * cfs_algorithm ()
130
174
{
131
- return "lz4 " ;
175
+ return "snappy " ;
132
176
}
133
177
134
- #elif CFS_COMPRESSOR == ZLIB_COMPRESSOR
178
+ #elif CFS_COMPRESSOR == LCFSE_COMPRESSOR
135
179
136
- #include <zlib .h>
180
+ #include <lcfse .h>
137
181
138
182
size_t cfs_compress (void * dst , size_t dst_size , void const * src , size_t src_size )
139
183
{
140
- uLongf compressed_size = dst_size ;
141
- int rc = compress2 (dst , & compressed_size , src , src_size , cfs_level );
142
- return rc == Z_OK ? compressed_size : rc ;
184
+ char * scratch_buf = palloc (lcfse_encode_scratch_size ());
185
+ size_t rc = lcfse_encode_buffer (dst , dst_size , src , src_size , scratch_buf );
186
+ pfree (scratch_buf );
187
+ return rc ;
143
188
}
144
189
145
190
size_t cfs_decompress (void * dst , size_t dst_size , void const * src , size_t src_size )
146
191
{
147
- uLongf dest_len = dst_size ;
148
- int rc = uncompress (dst , & dest_len , src , src_size );
149
- return rc == Z_OK ? dest_len : rc ;
192
+ char * scratch_buf = palloc (lcfse_encode_scratch_size ());
193
+ size_t rc = lcfse_decode_buffer (dst , dst_size , src , src_size , scratch_buf );
194
+ pfree (scratch_buf );
195
+ return rc ;
150
196
}
151
197
152
198
char const * cfs_algorithm ()
153
199
{
154
- return "zlib " ;
200
+ return "lcfse " ;
155
201
}
156
202
157
203
#elif CFS_COMPRESSOR == ZSTD_COMPRESSOR
@@ -195,6 +241,15 @@ char const* cfs_algorithm()
195
241
#endif
196
242
197
243
244
+ /* ----------------------------------------------------------------
245
+ * Section 2: Encryption related functionality.
246
+ *
247
+ * TODO
248
+ * - replace rc4 algrithm with something more appropriate
249
+ * - add more comments
250
+ * - what does 'offs' variable for?
251
+ * ----------------------------------------------------------------
252
+ */
198
253
static void cfs_rc4_encrypt_block (void * block , uint32 offs , uint32 block_size )
199
254
{
200
255
uint32 i ;
@@ -205,7 +260,7 @@ static void cfs_rc4_encrypt_block(void* block, uint32 offs, uint32 block_size)
205
260
int x = 0 , y = 0 ;
206
261
uint32 skip = (offs / BLCKSZ + block_size ) % CFS_CIPHER_KEY_SIZE ;
207
262
208
- memcpy (state , cfs_state -> rc4_init_state , CFS_CIPHER_KEY_SIZE );
263
+ memcpy (state , cfs_state -> cipher_key , CFS_CIPHER_KEY_SIZE );
209
264
for (i = 0 ; i < skip ; i ++ ) {
210
265
x = (x + 1 ) % CFS_CIPHER_KEY_SIZE ;
211
266
y = (y + state [x ]) % CFS_CIPHER_KEY_SIZE ;
@@ -224,7 +279,12 @@ static void cfs_rc4_encrypt_block(void* block, uint32 offs, uint32 block_size)
224
279
}
225
280
}
226
281
227
- static void cfs_rc4_init (void )
282
+ /*
283
+ * Get env variable PG_CIPHER_KEY and initialize encryption state.
284
+ * Unset variable afterward.
285
+ * Now implements cf4.
286
+ */
287
+ static void cfs_encrypt_init (void )
228
288
{
229
289
int index1 = 0 ;
230
290
int index2 = 0 ;
@@ -233,13 +293,13 @@ static void cfs_rc4_init(void)
233
293
int key_length ;
234
294
int x = 0 , y = 0 ;
235
295
char * cipher_key ;
236
- uint8 * rc4_init_state = cfs_state -> rc4_init_state ;
296
+ uint8 * rc4_init_state = cfs_state -> cipher_key ;
237
297
238
298
cipher_key = getenv ("PG_CIPHER_KEY" );
239
299
if (cipher_key == NULL ) {
240
300
elog (ERROR , "PG_CIPHER_KEY environment variable is not set" );
241
301
}
242
- unsetenv ("PG_CIPHER_KEY" ); /* make it not possible to inspect this environment variable through plperl */
302
+ unsetenv ("PG_CIPHER_KEY" ); /* disable inspection of this environment variable */
243
303
key_length = strlen (cipher_key );
244
304
for (i = 0 ; i < CFS_CIPHER_KEY_SIZE ; ++ i ) {
245
305
rc4_init_state [i ] = (uint8 )i ;
@@ -263,20 +323,21 @@ static void cfs_rc4_init(void)
263
323
void cfs_encrypt (void * block , uint32 offs , uint32 size )
264
324
{
265
325
if (cfs_encryption )
266
- {
267
326
cfs_rc4_encrypt_block (block , offs , size );
268
- }
269
327
}
270
328
271
329
void cfs_decrypt (void * block , uint32 offs , uint32 size )
272
330
{
273
331
if (cfs_encryption )
274
- {
275
332
cfs_rc4_encrypt_block (block , offs , size );
276
- }
277
333
}
278
334
279
-
335
+ /* ----------------------------------------------------------------
336
+ * Section 3: Compression implementation.
337
+ *
338
+ * TODO add description
339
+ * ----------------------------------------------------------------
340
+ */
280
341
void cfs_initialize ()
281
342
{
282
343
cfs_state = (CfsState * )ShmemAlloc (sizeof (CfsState ));
@@ -287,9 +348,9 @@ void cfs_initialize()
287
348
cfs_state -> gc_enabled = true;
288
349
cfs_state -> max_iterations = 0 ;
289
350
290
- if (cfs_encryption ) {
291
- cfs_rc4_init ();
292
- }
351
+ if (cfs_encryption )
352
+ cfs_encrypt_init ();
353
+
293
354
elog (LOG , "Start CFS version %s compression algorithm %s encryption %s" ,
294
355
CFS_VERSION , cfs_algorithm (), cfs_encryption ? "enabled" : "disabled" );
295
356
}
@@ -463,6 +524,12 @@ static int cfs_cmp_page_offs(void const* p1, void const* p2)
463
524
return o1 < o2 ? -1 : o1 == o2 ? 0 : 1 ;
464
525
}
465
526
527
+ /* ----------------------------------------------------------------
528
+ * Section 4: Garbage collection functionality.
529
+ *
530
+ * TODO add description. reorder functions.
531
+ * ----------------------------------------------------------------
532
+ */
466
533
/*
467
534
* Perform garbage collection (if required) of file
468
535
* @param map_path path to file map file (*.cfm).
0 commit comments