Skip to content

gh-132983: Remove EndlessZstdDecompressor remains #133856

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 30 additions & 150 deletions Modules/_zstd/decompressor.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,11 @@ typedef struct {
PyObject *unused_data;

/* 0 if decompressor has (or may has) unconsumed input data, 0 or 1. */
char needs_input;

/* For decompress(), 0 or 1.
1 when both input and output streams are at a frame edge, means a
frame is completely decoded and fully flushed, or the decompressor
just be initialized. */
char at_frame_edge;
bool needs_input;

/* For ZstdDecompressor, 0 or 1.
1 means the end of the first frame has been reached. */
char eof;

/* Used for fast reset above three variables */
char _unused_char_for_align;
bool eof;

/* __init__ has been called, 0 or 1. */
bool initialized;
Expand Down Expand Up @@ -258,19 +249,13 @@ _zstd_load_d_dict(ZstdDecompressor *self, PyObject *dict)
return 0;
}

typedef enum {
TYPE_DECOMPRESSOR, // <D>, ZstdDecompressor class
TYPE_ENDLESS_DECOMPRESSOR, // <E>, decompress() function
} decompress_type;

/*
Given the two types of decompressors (defined above),
decompress implementation for <D>, <E>, pseudo code:
Decompress implementation in pseudo code:

initialize_output_buffer
while True:
decompress_data
set_object_flag # .eof for <D>, .at_frame_edge for <E>.
set_object_flag # .eof

if output_buffer_exhausted:
if output_buffer_reached_max_length:
Expand All @@ -287,63 +272,19 @@ typedef enum {
flushing to do to complete current frame.

Note, decompressing "an empty input" in any case will make it > 0.

<E> supports multiple frames, has an .at_frame_edge flag, it means both the
input and output streams are at a frame edge. The flag can be set by this
statement:

.at_frame_edge = (zstd_ret == 0) ? 1 : 0

But if decompressing "an empty input" at "a frame edge", zstd_ret will be
non-zero, then .at_frame_edge will be wrongly set to false. To solve this
problem, two AFE checks are needed to ensure that: when at "a frame edge",
empty input will not be decompressed.

// AFE check
if (self->at_frame_edge && in->pos == in->size) {
finish
}

In <E>, if .at_frame_edge is eventually set to true, but input stream has
unconsumed data (in->pos < in->size), then the outer function
stream_decompress() will set .at_frame_edge to false. In this case,
although the output stream is at a frame edge, for the caller, the input
stream is not at a frame edge, see below diagram. This behavior does not
affect the next AFE check, since (in->pos < in->size).

input stream: --------------|---
^
output stream: ====================|
^
*/
static PyObject *
decompress_impl(ZstdDecompressor *self, ZSTD_inBuffer *in,
Py_ssize_t max_length,
Py_ssize_t initial_size,
decompress_type type)
Py_ssize_t max_length)
{
size_t zstd_ret;
ZSTD_outBuffer out;
_BlocksOutputBuffer buffer = {.list = NULL};
PyObject *ret;

/* The first AFE check for setting .at_frame_edge flag */
if (type == TYPE_ENDLESS_DECOMPRESSOR) {
if (self->at_frame_edge && in->pos == in->size) {
return Py_GetConstant(Py_CONSTANT_EMPTY_BYTES);
}
}

/* Initialize the output buffer */
if (initial_size >= 0) {
if (_OutputBuffer_InitWithSize(&buffer, &out, max_length, initial_size) < 0) {
goto error;
}
}
else {
if (_OutputBuffer_InitAndGrow(&buffer, &out, max_length) < 0) {
goto error;
}
if (_OutputBuffer_InitAndGrow(&buffer, &out, max_length) < 0) {
goto error;
}
assert(out.pos == 0);

Expand All @@ -362,22 +303,11 @@ decompress_impl(ZstdDecompressor *self, ZSTD_inBuffer *in,
goto error;
}

/* Set .eof/.af_frame_edge flag */
if (type == TYPE_DECOMPRESSOR) {
/* ZstdDecompressor class stops when a frame is decompressed */
if (zstd_ret == 0) {
self->eof = 1;
break;
}
}
else if (type == TYPE_ENDLESS_DECOMPRESSOR) {
/* decompress() function supports multiple frames */
self->at_frame_edge = (zstd_ret == 0) ? 1 : 0;

/* The second AFE check for setting .at_frame_edge flag */
if (self->at_frame_edge && in->pos == in->size) {
break;
}
/* Set .eof flag */
if (zstd_ret == 0) {
/* Stop when a frame is decompressed */
self->eof = 1;
break;
}

/* Need to check out before in. Maybe zstd's internal buffer still has
Expand Down Expand Up @@ -415,8 +345,7 @@ decompress_impl(ZstdDecompressor *self, ZSTD_inBuffer *in,
}

static void
decompressor_reset_session(ZstdDecompressor *self,
decompress_type type)
decompressor_reset_session(ZstdDecompressor *self)
{
// TODO(emmatyping): use _Py_CRITICAL_SECTION_ASSERT_OBJECT_LOCKED here
// and ensure lock is always held
Expand All @@ -425,56 +354,28 @@ decompressor_reset_session(ZstdDecompressor *self,
self->in_begin = 0;
self->in_end = 0;

if (type == TYPE_DECOMPRESSOR) {
Py_CLEAR(self->unused_data);
}
Py_CLEAR(self->unused_data);

/* Reset variables in one operation */
self->needs_input = 1;
self->at_frame_edge = 1;
self->eof = 0;
self->_unused_char_for_align = 0;

/* Resetting session never fail */
/* Resetting session is guaranteed to never fail */
ZSTD_DCtx_reset(self->dctx, ZSTD_reset_session_only);
}

static PyObject *
stream_decompress(ZstdDecompressor *self, Py_buffer *data, Py_ssize_t max_length,
decompress_type type)
stream_decompress(ZstdDecompressor *self, Py_buffer *data, Py_ssize_t max_length)
{
Py_ssize_t initial_buffer_size = -1;
ZSTD_inBuffer in;
PyObject *ret = NULL;
int use_input_buffer;

if (type == TYPE_DECOMPRESSOR) {
/* Check .eof flag */
if (self->eof) {
PyErr_SetString(PyExc_EOFError, "Already at the end of a zstd frame.");
assert(ret == NULL);
goto success;
}
}
else if (type == TYPE_ENDLESS_DECOMPRESSOR) {
/* Fast path for the first frame */
if (self->at_frame_edge && self->in_begin == self->in_end) {
/* Read decompressed size */
uint64_t decompressed_size = ZSTD_getFrameContentSize(data->buf, data->len);

/* These two zstd constants always > PY_SSIZE_T_MAX:
ZSTD_CONTENTSIZE_UNKNOWN is (0ULL - 1)
ZSTD_CONTENTSIZE_ERROR is (0ULL - 2)

Use ZSTD_findFrameCompressedSize() to check complete frame,
prevent allocating too much memory for small input chunk. */

if (decompressed_size <= (uint64_t) PY_SSIZE_T_MAX &&
!ZSTD_isError(ZSTD_findFrameCompressedSize(data->buf, data->len)) )
{
initial_buffer_size = (Py_ssize_t) decompressed_size;
}
}
/* Check .eof flag */
if (self->eof) {
PyErr_SetString(PyExc_EOFError, "Already at the end of a zstd frame.");
assert(ret == NULL);
return NULL;
}

/* Prepare input buffer w/wo unconsumed data */
Expand Down Expand Up @@ -561,30 +462,18 @@ stream_decompress(ZstdDecompressor *self, Py_buffer *data, Py_ssize_t max_length
assert(in.pos == 0);

/* Decompress */
ret = decompress_impl(self, &in,
max_length, initial_buffer_size,
type);
ret = decompress_impl(self, &in, max_length);
if (ret == NULL) {
goto error;
}

/* Unconsumed input data */
if (in.pos == in.size) {
if (type == TYPE_DECOMPRESSOR) {
if (Py_SIZE(ret) == max_length || self->eof) {
self->needs_input = 0;
}
else {
self->needs_input = 1;
}
if (Py_SIZE(ret) == max_length || self->eof) {
self->needs_input = 0;
}
else if (type == TYPE_ENDLESS_DECOMPRESSOR) {
if (Py_SIZE(ret) == max_length && !self->at_frame_edge) {
self->needs_input = 0;
}
else {
self->needs_input = 1;
}
else {
self->needs_input = 1;
}

if (use_input_buffer) {
Expand All @@ -598,10 +487,6 @@ stream_decompress(ZstdDecompressor *self, Py_buffer *data, Py_ssize_t max_length

self->needs_input = 0;

if (type == TYPE_ENDLESS_DECOMPRESSOR) {
self->at_frame_edge = 0;
}

if (!use_input_buffer) {
/* Discard buffer if it's too small
(resizing it may needlessly copy the current contents) */
Expand Down Expand Up @@ -634,16 +519,14 @@ stream_decompress(ZstdDecompressor *self, Py_buffer *data, Py_ssize_t max_length
}
}

goto success;
return ret;

error:
/* Reset decompressor's states/session */
decompressor_reset_session(self, type);
decompressor_reset_session(self);

Py_CLEAR(ret);
success:

return ret;
return NULL;
}


Expand All @@ -668,9 +551,6 @@ _zstd_ZstdDecompressor_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
/* needs_input flag */
self->needs_input = 1;

/* at_frame_edge flag */
self->at_frame_edge = 1;

/* Decompression context */
self->dctx = ZSTD_createDCtx();
if (self->dctx == NULL) {
Expand Down Expand Up @@ -837,7 +717,7 @@ _zstd_ZstdDecompressor_decompress_impl(ZstdDecompressor *self,
/* Thread-safe code */
Py_BEGIN_CRITICAL_SECTION(self);

ret = stream_decompress(self, data, max_length, TYPE_DECOMPRESSOR);
ret = stream_decompress(self, data, max_length);
Py_END_CRITICAL_SECTION();
return ret;
}
Expand Down
Loading