Skip to content

Commit 7a1865c

Browse files
committed
Add zlib compression method
Signed-off-by: Ildus Kurbangaliev <i.kurbangaliev@gmail.com>
1 parent d7c018d commit 7a1865c

File tree

6 files changed

+262
-2
lines changed

6 files changed

+262
-2
lines changed

src/backend/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ OBJS = $(SUBDIROBJS) $(LOCALOBJS) $(top_builddir)/src/port/libpgport_srv.a \
4545
LIBS := $(filter-out -lpgport -lpgcommon, $(LIBS)) $(LDAP_LIBS_BE) $(ICU_LIBS)
4646

4747
# The backend doesn't need everything that's in LIBS, however
48-
LIBS := $(filter-out -lz -lreadline -ledit -ltermcap -lncurses -lcurses, $(LIBS))
48+
LIBS := $(filter-out -lreadline -ledit -ltermcap -lncurses -lcurses, $(LIBS))
4949

5050
ifeq ($(with_systemd),yes)
5151
LIBS += -lsystemd

src/backend/access/compression/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,6 @@ subdir = src/backend/access/compression
1212
top_builddir = ../../../..
1313
include $(top_builddir)/src/Makefile.global
1414

15-
OBJS = cm_pglz.o cmapi.o
15+
OBJS = cm_pglz.o cm_zlib.o cmapi.o
1616

1717
include $(top_srcdir)/src/backend/common.mk
Lines changed: 252 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,252 @@
1+
/*-------------------------------------------------------------------------
2+
*
3+
* cm_zlib.c
4+
* zlib compression method
5+
*
6+
* Copyright (c) 2015-2018, PostgreSQL Global Development Group
7+
*
8+
*
9+
* IDENTIFICATION
10+
* src/backend/access/compression/cm_zlib.c
11+
*
12+
*-------------------------------------------------------------------------
13+
*/
14+
#include "postgres.h"
15+
#include "access/cmapi.h"
16+
#include "commands/defrem.h"
17+
#include "nodes/parsenodes.h"
18+
#include "utils/builtins.h"
19+
20+
#ifdef HAVE_LIBZ
21+
#include <zlib.h>
22+
23+
#define ZLIB_MAX_DICTIONARY_LENGTH 32768
24+
#define ZLIB_DICTIONARY_DELIM (" ,")
25+
26+
typedef struct
27+
{
28+
int level;
29+
Bytef dict[ZLIB_MAX_DICTIONARY_LENGTH];
30+
unsigned int dictlen;
31+
} zlib_state;
32+
33+
/*
34+
* Check options if specified. All validation is located here so
35+
* we don't need do it again in cminitstate function.
36+
*/
37+
static void
38+
zlib_cmcheck(Form_pg_attribute att, List *options)
39+
{
40+
ListCell *lc;
41+
foreach(lc, options)
42+
{
43+
DefElem *def = (DefElem *) lfirst(lc);
44+
45+
if (strcmp(def->defname, "level") == 0)
46+
{
47+
if (strcmp(defGetString(def), "best_speed") != 0 &&
48+
strcmp(defGetString(def), "best_compression") != 0 &&
49+
strcmp(defGetString(def), "default") != 0)
50+
ereport(ERROR,
51+
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
52+
errmsg("unexpected value for zlib compression level: \"%s\"", defGetString(def))));
53+
}
54+
else if (strcmp(def->defname, "dict") == 0)
55+
{
56+
int ntokens = 0;
57+
char *val,
58+
*tok;
59+
60+
val = pstrdup(defGetString(def));
61+
if (strlen(val) > (ZLIB_MAX_DICTIONARY_LENGTH - 1))
62+
ereport(ERROR,
63+
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
64+
(errmsg("zlib dictionary length should be less than %d", ZLIB_MAX_DICTIONARY_LENGTH))));
65+
66+
while ((tok = strtok(val, ZLIB_DICTIONARY_DELIM)) != NULL)
67+
{
68+
ntokens++;
69+
val = NULL;
70+
}
71+
72+
if (ntokens < 2)
73+
ereport(ERROR,
74+
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
75+
(errmsg("zlib dictionary is too small"))));
76+
}
77+
else
78+
ereport(ERROR,
79+
(errcode(ERRCODE_UNDEFINED_PARAMETER),
80+
errmsg("unexpected parameter for zlib: \"%s\"", def->defname)));
81+
}
82+
}
83+
84+
static void *
85+
zlib_cminitstate(Oid acoid, List *options)
86+
{
87+
zlib_state *state = NULL;
88+
89+
state = palloc0(sizeof(zlib_state));
90+
state->level = Z_DEFAULT_COMPRESSION;
91+
92+
if (list_length(options) > 0)
93+
{
94+
ListCell *lc;
95+
96+
foreach(lc, options)
97+
{
98+
DefElem *def = (DefElem *) lfirst(lc);
99+
100+
if (strcmp(def->defname, "level") == 0)
101+
{
102+
if (strcmp(defGetString(def), "best_speed") == 0)
103+
state->level = Z_BEST_SPEED;
104+
else if (strcmp(defGetString(def), "best_compression") == 0)
105+
state->level = Z_BEST_COMPRESSION;
106+
}
107+
else if (strcmp(def->defname, "dict") == 0)
108+
{
109+
char *val,
110+
*tok;
111+
112+
val = pstrdup(defGetString(def));
113+
114+
/* Fill the zlib dictionary */
115+
while ((tok = strtok(val, ZLIB_DICTIONARY_DELIM)) != NULL)
116+
{
117+
int len = strlen(tok);
118+
memcpy((void *) (state->dict + state->dictlen), tok, len);
119+
state->dictlen += len + 1;
120+
Assert(state->dictlen <= ZLIB_MAX_DICTIONARY_LENGTH);
121+
122+
/* add space as dictionary delimiter */
123+
state->dict[state->dictlen - 1] = ' ';
124+
val = NULL;
125+
}
126+
}
127+
}
128+
}
129+
130+
return state;
131+
}
132+
133+
static struct varlena *
134+
zlib_cmcompress(CompressionAmOptions *cmoptions, const struct varlena *value)
135+
{
136+
int32 valsize,
137+
len;
138+
struct varlena *tmp = NULL;
139+
z_streamp zp;
140+
int res;
141+
zlib_state *state = (zlib_state *) cmoptions->acstate;
142+
143+
zp = (z_streamp) palloc(sizeof(z_stream));
144+
zp->zalloc = Z_NULL;
145+
zp->zfree = Z_NULL;
146+
zp->opaque = Z_NULL;
147+
148+
if (deflateInit(zp, state->level) != Z_OK)
149+
elog(ERROR, "could not initialize compression library: %s", zp->msg);
150+
151+
if (state->dictlen > 0)
152+
{
153+
res = deflateSetDictionary(zp, state->dict, state->dictlen);
154+
if (res != Z_OK)
155+
elog(ERROR, "could not set dictionary for zlib: %s", zp->msg);
156+
}
157+
158+
valsize = VARSIZE_ANY_EXHDR(DatumGetPointer(value));
159+
tmp = (struct varlena *) palloc(valsize + VARHDRSZ_CUSTOM_COMPRESSED);
160+
zp->next_in = (void *) VARDATA_ANY(value);
161+
zp->avail_in = valsize;
162+
zp->avail_out = valsize;
163+
zp->next_out = (void *)((char *) tmp + VARHDRSZ_CUSTOM_COMPRESSED);
164+
165+
do {
166+
res = deflate(zp, Z_FINISH);
167+
if (res == Z_STREAM_ERROR)
168+
elog(ERROR, "could not compress data: %s", zp->msg);
169+
} while (zp->avail_in != 0);
170+
171+
Assert(res == Z_STREAM_END);
172+
173+
len = valsize - zp->avail_out;
174+
if (deflateEnd(zp) != Z_OK)
175+
elog(ERROR, "could not close compression stream: %s", zp->msg);
176+
pfree(zp);
177+
178+
if (len > 0)
179+
{
180+
SET_VARSIZE_COMPRESSED(tmp, len + VARHDRSZ_CUSTOM_COMPRESSED);
181+
return tmp;
182+
}
183+
184+
pfree(tmp);
185+
#endif
186+
return NULL;
187+
}
188+
189+
static struct varlena *
190+
zlib_cmdecompress(CompressionAmOptions *cmoptions, const struct varlena *value)
191+
{
192+
struct varlena *result;
193+
z_streamp zp;
194+
int res = Z_OK;
195+
zlib_state *state = (zlib_state *) cmoptions->acstate;
196+
197+
zp = (z_streamp) palloc(sizeof(z_stream));
198+
zp->zalloc = Z_NULL;
199+
zp->zfree = Z_NULL;
200+
zp->opaque = Z_NULL;
201+
202+
if (inflateInit(zp) != Z_OK)
203+
elog(ERROR, "could not initialize compression library: %s", zp->msg);
204+
205+
Assert(VARATT_IS_CUSTOM_COMPRESSED(value));
206+
zp->next_in = (void *) ((char *) value + VARHDRSZ_CUSTOM_COMPRESSED);
207+
zp->avail_in = VARSIZE(value) - VARHDRSZ_CUSTOM_COMPRESSED;
208+
zp->avail_out = VARRAWSIZE_4B_C(value);
209+
210+
result = (struct varlena *) palloc(zp->avail_out + VARHDRSZ);
211+
SET_VARSIZE(result, zp->avail_out + VARHDRSZ);
212+
zp->next_out = (void *) VARDATA(result);
213+
214+
while (zp->avail_in > 0)
215+
{
216+
res = inflate(zp, 0);
217+
if (res == Z_NEED_DICT && state->dictlen > 0)
218+
{
219+
res = inflateSetDictionary(zp, state->dict, state->dictlen);
220+
if (res != Z_OK)
221+
elog(ERROR, "could not set dictionary for zlib");
222+
continue;
223+
}
224+
if (!(res == Z_OK || res == Z_STREAM_END))
225+
elog(ERROR, "could not uncompress data: %s", zp->msg);
226+
}
227+
228+
if (inflateEnd(zp) != Z_OK)
229+
elog(ERROR, "could not close compression library: %s", zp->msg);
230+
231+
pfree(zp);
232+
return result;
233+
}
234+
235+
Datum
236+
zlibhandler(PG_FUNCTION_ARGS)
237+
{
238+
#ifndef HAVE_LIBZ
239+
ereport(ERROR,
240+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
241+
errmsg("not built with zlib support")));
242+
#else
243+
CompressionAmRoutine *routine = makeNode(CompressionAmRoutine);
244+
245+
routine->cmcheck = zlib_cmcheck;
246+
routine->cminitstate = zlib_cminitstate;
247+
routine->cmcompress = zlib_cmcompress;
248+
routine->cmdecompress = zlib_cmdecompress;
249+
250+
PG_RETURN_POINTER(routine);
251+
#endif
252+
}

src/include/catalog/pg_am.dat

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,5 +36,8 @@
3636
{ oid => '4002', oid_symbol => 'PGLZ_COMPRESSION_AM_OID',
3737
descr => 'pglz compression access method',
3838
amname => 'pglz', amhandler => 'pglzhandler', amtype => 'c' },
39+
{ oid => '4011', oid_symbol => 'ZLIB_COMPRESSION_AM_OID',
40+
descr => 'zlib compression access method',
41+
amname => 'zlib', amhandler => 'zlibhandler', amtype => 'c' },
3942

4043
]

src/include/catalog/pg_attr_compression.dat

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,6 @@
1919
[
2020

2121
{ acoid => '4002', acname => 'pglz' },
22+
{ acoid => '4011', acname => 'zlib' },
2223

2324
]

src/include/catalog/pg_proc.dat

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -914,6 +914,10 @@
914914
proname => 'pglzhandler', provolatile => 'v',
915915
prorettype => 'compression_am_handler', proargtypes => 'internal',
916916
prosrc => 'pglzhandler' },
917+
{ oid => '4010', descr => 'zlib compression access method handler',
918+
proname => 'zlibhandler', provolatile => 'v',
919+
prorettype => 'compression_am_handler', proargtypes => 'internal',
920+
prosrc => 'zlibhandler' },
917921

918922
{ oid => '338', descr => 'validate an operator class',
919923
proname => 'amvalidate', provolatile => 'v', prorettype => 'bool',

0 commit comments

Comments
 (0)