From daa5f438523ff7ce6b7510342e36607d3a526bee Mon Sep 17 00:00:00 2001 From: Emma Harper Smith Date: Mon, 19 May 2025 17:28:45 -0400 Subject: [PATCH 01/18] Use locks instead of critical sections Also remove critical sections around code only run in initializer. --- Lib/test/test_zstd.py | 1 - Modules/_zstd/compressor.c | 38 ++++++++++++++++-------------- Modules/_zstd/decompressor.c | 45 ++++++++++++++++++------------------ 3 files changed, 44 insertions(+), 40 deletions(-) diff --git a/Lib/test/test_zstd.py b/Lib/test/test_zstd.py index 53ca592ea38828..541db4441b035c 100644 --- a/Lib/test/test_zstd.py +++ b/Lib/test/test_zstd.py @@ -2430,7 +2430,6 @@ def test_buffer_protocol(self): self.assertEqual(f.write(arr), LENGTH) self.assertEqual(f.tell(), LENGTH) -@unittest.skip("it fails for now, see gh-133885") class FreeThreadingMethodTests(unittest.TestCase): @unittest.skipUnless(Py_GIL_DISABLED, 'this test can only possibly fail with GIL disabled') diff --git a/Modules/_zstd/compressor.c b/Modules/_zstd/compressor.c index 38baee2be1e95b..8dca57fa253949 100644 --- a/Modules/_zstd/compressor.c +++ b/Modules/_zstd/compressor.c @@ -17,6 +17,7 @@ class _zstd.ZstdCompressor "ZstdCompressor *" "&zstd_compressor_type_spec" #include "_zstdmodule.h" #include "buffer.h" #include "zstddict.h" +#include "internal/pycore_lock.h" // PyMutex_IsLocked #include // offsetof() #include // ZSTD_*() @@ -38,6 +39,9 @@ typedef struct { /* Compression level */ int compression_level; + + /* Lock to protect the compression context */ + PyMutex lock; } ZstdCompressor; #define ZstdCompressor_CAST(op) ((ZstdCompressor *)op) @@ -276,28 +280,22 @@ _zstd_load_c_dict(ZstdCompressor *self, PyObject *dict) } /* Reference a prepared dictionary. It overrides some compression context's parameters. */ - Py_BEGIN_CRITICAL_SECTION(self); zstd_ret = ZSTD_CCtx_refCDict(self->cctx, c_dict); - Py_END_CRITICAL_SECTION(); } else if (type == DICT_TYPE_UNDIGESTED) { /* Load a dictionary. It doesn't override compression context's parameters. */ - Py_BEGIN_CRITICAL_SECTION2(self, zd); zstd_ret = ZSTD_CCtx_loadDictionary( self->cctx, PyBytes_AS_STRING(zd->dict_content), Py_SIZE(zd->dict_content)); - Py_END_CRITICAL_SECTION2(); } else if (type == DICT_TYPE_PREFIX) { /* Load a prefix */ - Py_BEGIN_CRITICAL_SECTION2(self, zd); zstd_ret = ZSTD_CCtx_refPrefix( self->cctx, PyBytes_AS_STRING(zd->dict_content), Py_SIZE(zd->dict_content)); - Py_END_CRITICAL_SECTION2(); } else { Py_UNREACHABLE(); @@ -339,6 +337,7 @@ _zstd_ZstdCompressor_new_impl(PyTypeObject *type, PyObject *level, self->use_multithread = 0; self->dict = NULL; + self->lock = (PyMutex){0}; /* Compression context */ self->cctx = ZSTD_createCCtx(); @@ -403,6 +402,10 @@ ZstdCompressor_dealloc(PyObject *ob) ZSTD_freeCCtx(self->cctx); } + if (PyMutex_IsLocked(&self->lock)) { + PyMutex_Unlock(&self->lock); + } + /* Py_XDECREF the dict after free the compression context */ Py_CLEAR(self->dict); @@ -412,8 +415,8 @@ ZstdCompressor_dealloc(PyObject *ob) } static PyObject * -compress_impl(ZstdCompressor *self, Py_buffer *data, - ZSTD_EndDirective end_directive) +compress_lock_held(ZstdCompressor *self, Py_buffer *data, + ZSTD_EndDirective end_directive) { ZSTD_inBuffer in; ZSTD_outBuffer out; @@ -495,7 +498,7 @@ mt_continue_should_break(ZSTD_inBuffer *in, ZSTD_outBuffer *out) #endif static PyObject * -compress_mt_continue_impl(ZstdCompressor *self, Py_buffer *data) +compress_mt_continue_lock_held(ZstdCompressor *self, Py_buffer *data) { ZSTD_inBuffer in; ZSTD_outBuffer out; @@ -529,7 +532,7 @@ compress_mt_continue_impl(ZstdCompressor *self, Py_buffer *data) goto error; } - /* Like compress_impl(), output as much as possible. */ + /* Like compress_lock_held(), output as much as possible. */ if (out.pos == out.size) { if (_OutputBuffer_Grow(&buffer, &out) < 0) { goto error; @@ -588,14 +591,14 @@ _zstd_ZstdCompressor_compress_impl(ZstdCompressor *self, Py_buffer *data, } /* Thread-safe code */ - Py_BEGIN_CRITICAL_SECTION(self); + PyMutex_Lock(&self->lock); /* Compress */ if (self->use_multithread && mode == ZSTD_e_continue) { - ret = compress_mt_continue_impl(self, data); + ret = compress_mt_continue_lock_held(self, data); } else { - ret = compress_impl(self, data, mode); + ret = compress_lock_held(self, data, mode); } if (ret) { @@ -607,7 +610,7 @@ _zstd_ZstdCompressor_compress_impl(ZstdCompressor *self, Py_buffer *data, /* Resetting cctx's session never fail */ ZSTD_CCtx_reset(self->cctx, ZSTD_reset_session_only); } - Py_END_CRITICAL_SECTION(); + PyMutex_Unlock(&self->lock); return ret; } @@ -642,8 +645,9 @@ _zstd_ZstdCompressor_flush_impl(ZstdCompressor *self, int mode) } /* Thread-safe code */ - Py_BEGIN_CRITICAL_SECTION(self); - ret = compress_impl(self, NULL, mode); + PyMutex_Lock(&self->lock); + + ret = compress_lock_held(self, NULL, mode); if (ret) { self->last_mode = mode; @@ -654,7 +658,7 @@ _zstd_ZstdCompressor_flush_impl(ZstdCompressor *self, int mode) /* Resetting cctx's session never fail */ ZSTD_CCtx_reset(self->cctx, ZSTD_reset_session_only); } - Py_END_CRITICAL_SECTION(); + PyMutex_Unlock(&self->lock); return ret; } diff --git a/Modules/_zstd/decompressor.c b/Modules/_zstd/decompressor.c index 58f9c9f804e549..9672592aa70d8c 100644 --- a/Modules/_zstd/decompressor.c +++ b/Modules/_zstd/decompressor.c @@ -17,6 +17,7 @@ class _zstd.ZstdDecompressor "ZstdDecompressor *" "&zstd_decompressor_type_spec" #include "_zstdmodule.h" #include "buffer.h" #include "zstddict.h" +#include "internal/pycore_lock.h" // PyMutex_IsLocked #include // bool #include // offsetof() @@ -45,6 +46,9 @@ typedef struct { /* For ZstdDecompressor, 0 or 1. 1 means the end of the first frame has been reached. */ bool eof; + + /* Lock to protect the decompression context */ + PyMutex lock; } ZstdDecompressor; #define ZstdDecompressor_CAST(op) ((ZstdDecompressor *)op) @@ -61,7 +65,6 @@ _get_DDict(ZstdDict *self) return self->d_dict; } - Py_BEGIN_CRITICAL_SECTION(self); if (self->d_dict == NULL) { /* Create ZSTD_DDict instance from dictionary content */ char *dict_buffer = PyBytes_AS_STRING(self->dict_content); @@ -83,7 +86,6 @@ _get_DDict(ZstdDict *self) /* Don't lose any exception */ ret = self->d_dict; - Py_END_CRITICAL_SECTION(); return ret; } @@ -134,9 +136,7 @@ _zstd_set_d_parameters(ZstdDecompressor *self, PyObject *options) } /* Set parameter to compression context */ - Py_BEGIN_CRITICAL_SECTION(self); zstd_ret = ZSTD_DCtx_setParameter(self->dctx, key_v, value_v); - Py_END_CRITICAL_SECTION(); /* Check error */ if (ZSTD_isError(zstd_ret)) { @@ -206,27 +206,21 @@ _zstd_load_d_dict(ZstdDecompressor *self, PyObject *dict) return -1; } /* Reference a prepared dictionary */ - Py_BEGIN_CRITICAL_SECTION(self); zstd_ret = ZSTD_DCtx_refDDict(self->dctx, d_dict); - Py_END_CRITICAL_SECTION(); } else if (type == DICT_TYPE_UNDIGESTED) { /* Load a dictionary */ - Py_BEGIN_CRITICAL_SECTION2(self, zd); zstd_ret = ZSTD_DCtx_loadDictionary( self->dctx, PyBytes_AS_STRING(zd->dict_content), Py_SIZE(zd->dict_content)); - Py_END_CRITICAL_SECTION2(); } else if (type == DICT_TYPE_PREFIX) { /* Load a prefix */ - Py_BEGIN_CRITICAL_SECTION2(self, zd); zstd_ret = ZSTD_DCtx_refPrefix( self->dctx, PyBytes_AS_STRING(zd->dict_content), Py_SIZE(zd->dict_content)); - Py_END_CRITICAL_SECTION2(); } else { /* Impossible code path */ @@ -268,8 +262,8 @@ _zstd_load_d_dict(ZstdDecompressor *self, PyObject *dict) Note, decompressing "an empty input" in any case will make it > 0. */ static PyObject * -decompress_impl(ZstdDecompressor *self, ZSTD_inBuffer *in, - Py_ssize_t max_length) +decompress_lock_held(ZstdDecompressor *self, ZSTD_inBuffer *in, + Py_ssize_t max_length) { size_t zstd_ret; ZSTD_outBuffer out; @@ -339,10 +333,8 @@ decompress_impl(ZstdDecompressor *self, ZSTD_inBuffer *in, } static void -decompressor_reset_session(ZstdDecompressor *self) +decompressor_reset_session_lock_held(ZstdDecompressor *self) { - // TODO(emmatyping): use _Py_CRITICAL_SECTION_ASSERT_OBJECT_LOCKED here - // and ensure lock is always held /* Reset variables */ self->in_begin = 0; @@ -359,7 +351,8 @@ decompressor_reset_session(ZstdDecompressor *self) } static PyObject * -stream_decompress(ZstdDecompressor *self, Py_buffer *data, Py_ssize_t max_length) +stream_decompress_lock_held(ZstdDecompressor *self, Py_buffer *data, + Py_ssize_t max_length) { ZSTD_inBuffer in; PyObject *ret = NULL; @@ -456,7 +449,7 @@ stream_decompress(ZstdDecompressor *self, Py_buffer *data, Py_ssize_t max_length assert(in.pos == 0); /* Decompress */ - ret = decompress_impl(self, &in, max_length); + ret = decompress_lock_held(self, &in, max_length); if (ret == NULL) { goto error; } @@ -517,7 +510,7 @@ stream_decompress(ZstdDecompressor *self, Py_buffer *data, Py_ssize_t max_length error: /* Reset decompressor's states/session */ - decompressor_reset_session(self); + decompressor_reset_session_lock_held(self); Py_CLEAR(ret); return NULL; @@ -555,6 +548,7 @@ _zstd_ZstdDecompressor_new_impl(PyTypeObject *type, PyObject *zstd_dict, self->unused_data = NULL; self->eof = 0; self->dict = NULL; + self->lock = (PyMutex){0}; /* needs_input flag */ self->needs_input = 1; @@ -608,6 +602,10 @@ ZstdDecompressor_dealloc(PyObject *ob) ZSTD_freeDCtx(self->dctx); } + if (PyMutex_IsLocked(&self->lock)) { + PyMutex_Unlock(&self->lock); + } + /* Py_CLEAR the dict after free decompression context */ Py_CLEAR(self->dict); @@ -639,7 +637,10 @@ _zstd_ZstdDecompressor_unused_data_get_impl(ZstdDecompressor *self) { PyObject *ret; + PyMutex_Lock(&self->lock); + if (!self->eof) { + PyMutex_Unlock(&self->lock); return Py_GetConstant(Py_CONSTANT_EMPTY_BYTES); } else { @@ -656,6 +657,7 @@ _zstd_ZstdDecompressor_unused_data_get_impl(ZstdDecompressor *self) } } + PyMutex_Unlock(&self->lock); return ret; } @@ -693,10 +695,9 @@ _zstd_ZstdDecompressor_decompress_impl(ZstdDecompressor *self, { PyObject *ret; /* Thread-safe code */ - Py_BEGIN_CRITICAL_SECTION(self); - - ret = stream_decompress(self, data, max_length); - Py_END_CRITICAL_SECTION(); + PyMutex_Lock(&self->lock); + ret = stream_decompress_lock_held(self, data, max_length); + PyMutex_Unlock(&self->lock); return ret; } From 0d14dc26e8c41c6612e7b854319ffe3433f3137e Mon Sep 17 00:00:00 2001 From: Emma Harper Smith Date: Mon, 19 May 2025 17:39:39 -0400 Subject: [PATCH 02/18] Remove one more critical section --- Modules/_zstd/compressor.c | 3 --- 1 file changed, 3 deletions(-) diff --git a/Modules/_zstd/compressor.c b/Modules/_zstd/compressor.c index 8dca57fa253949..f35b2b7700365c 100644 --- a/Modules/_zstd/compressor.c +++ b/Modules/_zstd/compressor.c @@ -157,8 +157,6 @@ _get_CDict(ZstdDict *self, int compressionLevel) PyObject *capsule; ZSTD_CDict *cdict; - // TODO(emmatyping): refactor critical section code into a lock_held function - Py_BEGIN_CRITICAL_SECTION(self); /* int level object */ level = PyLong_FromLong(compressionLevel); @@ -216,7 +214,6 @@ _get_CDict(ZstdDict *self, int compressionLevel) cdict = NULL; success: Py_XDECREF(level); - Py_END_CRITICAL_SECTION(); return cdict; } From 1b68e1e098e201c6666e4fb2bcf48cf06678db94 Mon Sep 17 00:00:00 2001 From: Emma Harper Smith Date: Tue, 20 May 2025 17:28:59 -0400 Subject: [PATCH 03/18] Make capsule management threadsafe Using PyDict_SetDefaultRef allows us to not have contention on the capsules dictionary shared by ZstdDict instances. --- Modules/_zstd/compressor.c | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/Modules/_zstd/compressor.c b/Modules/_zstd/compressor.c index f35b2b7700365c..03853202c33835 100644 --- a/Modules/_zstd/compressor.c +++ b/Modules/_zstd/compressor.c @@ -165,12 +165,12 @@ _get_CDict(ZstdDict *self, int compressionLevel) } /* Get PyCapsule object from self->c_dicts */ - capsule = PyDict_GetItemWithError(self->c_dicts, level); - if (capsule == NULL) { - if (PyErr_Occurred()) { - goto error; - } + int result = PyDict_GetItemRef(self->c_dicts, level, &capsule); + if (result < 0) { + goto error; + } + if (capsule == NULL) { /* Create ZSTD_CDict instance */ char *dict_buffer = PyBytes_AS_STRING(self->dict_content); Py_ssize_t dict_len = Py_SIZE(self->dict_content); @@ -197,16 +197,19 @@ _get_CDict(ZstdDict *self, int compressionLevel) goto error; } - /* Add PyCapsule object to self->c_dicts */ - if (PyDict_SetItem(self->c_dicts, level, capsule) < 0) { - Py_DECREF(capsule); + /* Add PyCapsule object to self->c_dicts if not already inserted */ + PyObject *capsule_value; + int result = PyDict_SetDefaultRef(self->c_dicts, level, capsule, + &capsule_value); + if (result < 0) { goto error; } - Py_DECREF(capsule); + Py_XDECREF(capsule_value); } else { /* ZSTD_CDict instance already exists */ cdict = PyCapsule_GetPointer(capsule, NULL); + Py_DECREF(capsule); } goto success; From f6602f4b5a15d4e2c8b7536fc2202d8d27c858c3 Mon Sep 17 00:00:00 2001 From: Emma Harper Smith Date: Tue, 20 May 2025 17:59:05 -0400 Subject: [PATCH 04/18] Add a test for sharing a dictionary across threads --- Lib/test/test_zstd.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/Lib/test/test_zstd.py b/Lib/test/test_zstd.py index 541db4441b035c..bf8afb7feb85d6 100644 --- a/Lib/test/test_zstd.py +++ b/Lib/test/test_zstd.py @@ -2506,6 +2506,28 @@ def run_method(method, input_data, output_data): self.assertEqual(expected, actual) + @threading_helper.reap_threads + @threading_helper.requires_working_threading() + def test_compress_shared_dict(self): + num_threads = 8 + + def run_method(b): + level = threading.get_ident() % 2 + # sync threads to increase chance of contention on + # capsule storing dictionary levels + b.wait() + ZstdCompressor(level=level, zstd_dict=TRAINED_DICT.as_digested_dict) + threads = [] + + b = threading.Barrier(num_threads) + for i in range(num_threads): + thread = threading.Thread(target=run_method, args=(b,)) + + threads.append(thread) + + with threading_helper.start_threads(threads): + pass + if __name__ == "__main__": unittest.main() From d8854c53a2ef11b73b96748f34124f5d93ea0d22 Mon Sep 17 00:00:00 2001 From: Emma Harper Smith Date: Tue, 20 May 2025 18:00:25 -0400 Subject: [PATCH 05/18] Move lock check in dealloc to an assert --- Modules/_zstd/compressor.c | 4 +--- Modules/_zstd/decompressor.c | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/Modules/_zstd/compressor.c b/Modules/_zstd/compressor.c index 03853202c33835..27d09b24313cf6 100644 --- a/Modules/_zstd/compressor.c +++ b/Modules/_zstd/compressor.c @@ -402,9 +402,7 @@ ZstdCompressor_dealloc(PyObject *ob) ZSTD_freeCCtx(self->cctx); } - if (PyMutex_IsLocked(&self->lock)) { - PyMutex_Unlock(&self->lock); - } + assert(!PyMutex_IsLocked(&self->lock)); /* Py_XDECREF the dict after free the compression context */ Py_CLEAR(self->dict); diff --git a/Modules/_zstd/decompressor.c b/Modules/_zstd/decompressor.c index 9672592aa70d8c..ea2017fc29a2f2 100644 --- a/Modules/_zstd/decompressor.c +++ b/Modules/_zstd/decompressor.c @@ -602,9 +602,7 @@ ZstdDecompressor_dealloc(PyObject *ob) ZSTD_freeDCtx(self->dctx); } - if (PyMutex_IsLocked(&self->lock)) { - PyMutex_Unlock(&self->lock); - } + assert(!PyMutex_IsLocked(&self->lock)); /* Py_CLEAR the dict after free decompression context */ Py_CLEAR(self->dict); From 02fc2b3e8ecba886c9e42dccef792ec913a2499f Mon Sep 17 00:00:00 2001 From: Emma Harper Smith Date: Tue, 20 May 2025 18:01:45 -0400 Subject: [PATCH 06/18] Run threading checks on GIL builds too --- Lib/test/test_zstd.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/Lib/test/test_zstd.py b/Lib/test/test_zstd.py index bf8afb7feb85d6..ef784e1733652c 100644 --- a/Lib/test/test_zstd.py +++ b/Lib/test/test_zstd.py @@ -2432,7 +2432,6 @@ def test_buffer_protocol(self): class FreeThreadingMethodTests(unittest.TestCase): - @unittest.skipUnless(Py_GIL_DISABLED, 'this test can only possibly fail with GIL disabled') @threading_helper.reap_threads @threading_helper.requires_working_threading() def test_compress_locking(self): @@ -2469,7 +2468,6 @@ def run_method(method, input_data, output_data): actual = b''.join(output) + rest2 self.assertEqual(expected, actual) - @unittest.skipUnless(Py_GIL_DISABLED, 'this test can only possibly fail with GIL disabled') @threading_helper.reap_threads @threading_helper.requires_working_threading() def test_decompress_locking(self): From edbf444b6b289af6c1e766380efd59c8ec3cf10a Mon Sep 17 00:00:00 2001 From: Emma Harper Smith Date: Tue, 20 May 2025 18:02:11 -0400 Subject: [PATCH 07/18] Remove extra blank line --- Lib/test/test_zstd.py | 1 - 1 file changed, 1 deletion(-) diff --git a/Lib/test/test_zstd.py b/Lib/test/test_zstd.py index ef784e1733652c..87d6ade6d762ce 100644 --- a/Lib/test/test_zstd.py +++ b/Lib/test/test_zstd.py @@ -2503,7 +2503,6 @@ def run_method(method, input_data, output_data): actual = b''.join(output) self.assertEqual(expected, actual) - @threading_helper.reap_threads @threading_helper.requires_working_threading() def test_compress_shared_dict(self): From bf287cfaf35ae17d1d579e241f5e799101e02cc8 Mon Sep 17 00:00:00 2001 From: Emma Harper Smith Date: Tue, 20 May 2025 18:05:27 -0400 Subject: [PATCH 08/18] Remove extraneous whitespace --- Lib/test/test_zstd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/test/test_zstd.py b/Lib/test/test_zstd.py index 87d6ade6d762ce..df0ffa185b5363 100644 --- a/Lib/test/test_zstd.py +++ b/Lib/test/test_zstd.py @@ -2507,7 +2507,7 @@ def run_method(method, input_data, output_data): @threading_helper.requires_working_threading() def test_compress_shared_dict(self): num_threads = 8 - + def run_method(b): level = threading.get_ident() % 2 # sync threads to increase chance of contention on From 005d1a06ba8cd1fcef01f4fbcc57b010657f0f7e Mon Sep 17 00:00:00 2001 From: Emma Harper Smith Date: Wed, 21 May 2025 00:10:54 -0400 Subject: [PATCH 09/18] Remove unused includes --- Modules/_zstd/compressor.c | 1 - Modules/_zstd/decompressor.c | 1 - 2 files changed, 2 deletions(-) diff --git a/Modules/_zstd/compressor.c b/Modules/_zstd/compressor.c index 27d09b24313cf6..3685160f1a2911 100644 --- a/Modules/_zstd/compressor.c +++ b/Modules/_zstd/compressor.c @@ -17,7 +17,6 @@ class _zstd.ZstdCompressor "ZstdCompressor *" "&zstd_compressor_type_spec" #include "_zstdmodule.h" #include "buffer.h" #include "zstddict.h" -#include "internal/pycore_lock.h" // PyMutex_IsLocked #include // offsetof() #include // ZSTD_*() diff --git a/Modules/_zstd/decompressor.c b/Modules/_zstd/decompressor.c index ea2017fc29a2f2..af9e55315f29ad 100644 --- a/Modules/_zstd/decompressor.c +++ b/Modules/_zstd/decompressor.c @@ -17,7 +17,6 @@ class _zstd.ZstdDecompressor "ZstdDecompressor *" "&zstd_decompressor_type_spec" #include "_zstdmodule.h" #include "buffer.h" #include "zstddict.h" -#include "internal/pycore_lock.h" // PyMutex_IsLocked #include // bool #include // offsetof() From 84c5b3065f22cd4156bab9cd3f18ec8e3af421b6 Mon Sep 17 00:00:00 2001 From: Emma Harper Smith Date: Wed, 21 May 2025 00:13:32 -0400 Subject: [PATCH 10/18] Revert "Remove unused includes" This reverts commit 005d1a06ba8cd1fcef01f4fbcc57b010657f0f7e. --- Modules/_zstd/compressor.c | 1 + Modules/_zstd/decompressor.c | 1 + 2 files changed, 2 insertions(+) diff --git a/Modules/_zstd/compressor.c b/Modules/_zstd/compressor.c index 3685160f1a2911..27d09b24313cf6 100644 --- a/Modules/_zstd/compressor.c +++ b/Modules/_zstd/compressor.c @@ -17,6 +17,7 @@ class _zstd.ZstdCompressor "ZstdCompressor *" "&zstd_compressor_type_spec" #include "_zstdmodule.h" #include "buffer.h" #include "zstddict.h" +#include "internal/pycore_lock.h" // PyMutex_IsLocked #include // offsetof() #include // ZSTD_*() diff --git a/Modules/_zstd/decompressor.c b/Modules/_zstd/decompressor.c index af9e55315f29ad..ea2017fc29a2f2 100644 --- a/Modules/_zstd/decompressor.c +++ b/Modules/_zstd/decompressor.c @@ -17,6 +17,7 @@ class _zstd.ZstdDecompressor "ZstdDecompressor *" "&zstd_decompressor_type_spec" #include "_zstdmodule.h" #include "buffer.h" #include "zstddict.h" +#include "internal/pycore_lock.h" // PyMutex_IsLocked #include // bool #include // offsetof() From ec310194c1149f6bf4263bb6a45897e7d909c412 Mon Sep 17 00:00:00 2001 From: Emma Harper Smith Date: Wed, 21 May 2025 09:46:34 -0400 Subject: [PATCH 11/18] Put locks around ZstdDict usage This also reverts the setdefault usage which we don't need anymore and refactors the _zstd_load_(c,d)_dict functions to not use goto/properly lock around dictionary usage. --- Modules/_zstd/compressor.c | 109 ++++++++++++++++++----------------- Modules/_zstd/decompressor.c | 92 ++++++++++++++++------------- Modules/_zstd/zstddict.c | 3 + Modules/_zstd/zstddict.h | 3 + 4 files changed, 114 insertions(+), 93 deletions(-) diff --git a/Modules/_zstd/compressor.c b/Modules/_zstd/compressor.c index 27d09b24313cf6..7e460c4a77ce86 100644 --- a/Modules/_zstd/compressor.c +++ b/Modules/_zstd/compressor.c @@ -165,12 +165,11 @@ _get_CDict(ZstdDict *self, int compressionLevel) } /* Get PyCapsule object from self->c_dicts */ - int result = PyDict_GetItemRef(self->c_dicts, level, &capsule); - if (result < 0) { - goto error; - } - + capsule = PyDict_GetItemWithError(self->c_dicts, level); if (capsule == NULL) { + if (PyErr_Occurred()) { + goto error; + } /* Create ZSTD_CDict instance */ char *dict_buffer = PyBytes_AS_STRING(self->dict_content); Py_ssize_t dict_len = Py_SIZE(self->dict_content); @@ -198,18 +197,15 @@ _get_CDict(ZstdDict *self, int compressionLevel) } /* Add PyCapsule object to self->c_dicts if not already inserted */ - PyObject *capsule_value; - int result = PyDict_SetDefaultRef(self->c_dicts, level, capsule, - &capsule_value); - if (result < 0) { + if (PyDict_SetItem(self->c_dicts, level, capsule) < 0) { + Py_DECREF(capsule); goto error; } - Py_XDECREF(capsule_value); + Py_DECREF(capsule); } else { /* ZSTD_CDict instance already exists */ cdict = PyCapsule_GetPointer(capsule, NULL); - Py_DECREF(capsule); } goto success; @@ -221,10 +217,50 @@ _get_CDict(ZstdDict *self, int compressionLevel) } static int -_zstd_load_c_dict(ZstdCompressor *self, PyObject *dict) +_zstd_load_impl(ZstdCompressor *self, ZstdDict *zd, + _zstd_state *mod_state, int type) { - size_t zstd_ret; + if (type == DICT_TYPE_DIGESTED) { + /* Get ZSTD_CDict */ + ZSTD_CDict *c_dict = _get_CDict(zd, self->compression_level); + if (c_dict == NULL) { + return -1; + } + /* Reference a prepared dictionary. + It overrides some compression context's parameters. */ + zstd_ret = ZSTD_CCtx_refCDict(self->cctx, c_dict); + } + else if (type == DICT_TYPE_UNDIGESTED) { + /* Load a dictionary. + It doesn't override compression context's parameters. */ + zstd_ret = ZSTD_CCtx_loadDictionary( + self->cctx, + PyBytes_AS_STRING(zd->dict_content), + Py_SIZE(zd->dict_content)); + } + else if (type == DICT_TYPE_PREFIX) { + /* Load a prefix */ + zstd_ret = ZSTD_CCtx_refPrefix( + self->cctx, + PyBytes_AS_STRING(zd->dict_content), + Py_SIZE(zd->dict_content)); + } + else { + Py_UNREACHABLE(); + } + + /* Check error */ + if (ZSTD_isError(zstd_ret)) { + set_zstd_error(mod_state, ERR_LOAD_C_DICT, zstd_ret); + return -1; + } + return 0; +} + +static int +_zstd_load_c_dict(ZstdCompressor *self, PyObject *dict) +{ _zstd_state* const mod_state = PyType_GetModuleState(Py_TYPE(self)); if (mod_state == NULL) { return -1; @@ -241,7 +277,10 @@ _zstd_load_c_dict(ZstdCompressor *self, PyObject *dict) /* When compressing, use undigested dictionary by default. */ zd = (ZstdDict*)dict; type = DICT_TYPE_UNDIGESTED; - goto load; + PyMutex_Lock(&zd->lock); + ret = _zstd_load_impl(self, zd, mod_state, type); + PyMutex_Unlock(&zd->lock); + return ret; } /* Check (ZstdDict, type) */ @@ -261,7 +300,10 @@ _zstd_load_c_dict(ZstdCompressor *self, PyObject *dict) { assert(type >= 0); zd = (ZstdDict*)PyTuple_GET_ITEM(dict, 0); - goto load; + PyMutex_Lock(&zd->lock); + ret = _zstd_load_impl(self, zd, mod_state, type); + PyMutex_Unlock(&zd->lock); + return ret; } } } @@ -270,43 +312,6 @@ _zstd_load_c_dict(ZstdCompressor *self, PyObject *dict) PyErr_SetString(PyExc_TypeError, "zstd_dict argument should be ZstdDict object."); return -1; - -load: - if (type == DICT_TYPE_DIGESTED) { - /* Get ZSTD_CDict */ - ZSTD_CDict *c_dict = _get_CDict(zd, self->compression_level); - if (c_dict == NULL) { - return -1; - } - /* Reference a prepared dictionary. - It overrides some compression context's parameters. */ - zstd_ret = ZSTD_CCtx_refCDict(self->cctx, c_dict); - } - else if (type == DICT_TYPE_UNDIGESTED) { - /* Load a dictionary. - It doesn't override compression context's parameters. */ - zstd_ret = ZSTD_CCtx_loadDictionary( - self->cctx, - PyBytes_AS_STRING(zd->dict_content), - Py_SIZE(zd->dict_content)); - } - else if (type == DICT_TYPE_PREFIX) { - /* Load a prefix */ - zstd_ret = ZSTD_CCtx_refPrefix( - self->cctx, - PyBytes_AS_STRING(zd->dict_content), - Py_SIZE(zd->dict_content)); - } - else { - Py_UNREACHABLE(); - } - - /* Check error */ - if (ZSTD_isError(zstd_ret)) { - set_zstd_error(mod_state, ERR_LOAD_C_DICT, zstd_ret); - return -1; - } - return 0; } /*[clinic input] diff --git a/Modules/_zstd/decompressor.c b/Modules/_zstd/decompressor.c index ea2017fc29a2f2..73f8fb3e5176a9 100644 --- a/Modules/_zstd/decompressor.c +++ b/Modules/_zstd/decompressor.c @@ -147,11 +147,53 @@ _zstd_set_d_parameters(ZstdDecompressor *self, PyObject *options) return 0; } +static int +_zstd_load_impl(ZstdDecompressor *self, ZstdDict *zd, + _zstd_state *mod_state, int type) +{ + size_t zstd_ret; + if (type == DICT_TYPE_DIGESTED) { + /* Get ZSTD_DDict */ + ZSTD_DDict *d_dict = _get_DDict(zd); + if (d_dict == NULL) { + return -1; + } + /* Reference a prepared dictionary */ + zstd_ret = ZSTD_DCtx_refDDict(self->dctx, d_dict); + } + else if (type == DICT_TYPE_UNDIGESTED) { + /* Load a dictionary */ + zstd_ret = ZSTD_DCtx_loadDictionary( + self->dctx, + PyBytes_AS_STRING(zd->dict_content), + Py_SIZE(zd->dict_content)); + } + else if (type == DICT_TYPE_PREFIX) { + /* Load a prefix */ + zstd_ret = ZSTD_DCtx_refPrefix( + self->dctx, + PyBytes_AS_STRING(zd->dict_content), + Py_SIZE(zd->dict_content)); + } + else { + /* Impossible code path */ + PyErr_SetString(PyExc_SystemError, + "load_d_dict() impossible code path"); + return -1; + } + + /* Check error */ + if (ZSTD_isError(zstd_ret)) { + set_zstd_error(mod_state, ERR_LOAD_D_DICT, zstd_ret); + return -1; + } + return 0; +} + /* Load dictionary or prefix to decompression context */ static int _zstd_load_d_dict(ZstdDecompressor *self, PyObject *dict) { - size_t zstd_ret; _zstd_state* const mod_state = PyType_GetModuleState(Py_TYPE(self)); if (mod_state == NULL) { return -1; @@ -168,7 +210,10 @@ _zstd_load_d_dict(ZstdDecompressor *self, PyObject *dict) /* When decompressing, use digested dictionary by default. */ zd = (ZstdDict*)dict; type = DICT_TYPE_DIGESTED; - goto load; + PyMutex_Lock(&zd->lock); + ret = _zstd_load_impl(self, zd, mod_state, type); + PyMutex_Unlock(&zd->lock); + return ret; } /* Check (ZstdDict, type) */ @@ -188,7 +233,10 @@ _zstd_load_d_dict(ZstdDecompressor *self, PyObject *dict) { assert(type >= 0); zd = (ZstdDict*)PyTuple_GET_ITEM(dict, 0); - goto load; + PyMutex_Lock(&zd->lock); + ret = _zstd_load_impl(self, zd, mod_state, type); + PyMutex_Unlock(&zd->lock); + return ret; } } } @@ -197,44 +245,6 @@ _zstd_load_d_dict(ZstdDecompressor *self, PyObject *dict) PyErr_SetString(PyExc_TypeError, "zstd_dict argument should be ZstdDict object."); return -1; - -load: - if (type == DICT_TYPE_DIGESTED) { - /* Get ZSTD_DDict */ - ZSTD_DDict *d_dict = _get_DDict(zd); - if (d_dict == NULL) { - return -1; - } - /* Reference a prepared dictionary */ - zstd_ret = ZSTD_DCtx_refDDict(self->dctx, d_dict); - } - else if (type == DICT_TYPE_UNDIGESTED) { - /* Load a dictionary */ - zstd_ret = ZSTD_DCtx_loadDictionary( - self->dctx, - PyBytes_AS_STRING(zd->dict_content), - Py_SIZE(zd->dict_content)); - } - else if (type == DICT_TYPE_PREFIX) { - /* Load a prefix */ - zstd_ret = ZSTD_DCtx_refPrefix( - self->dctx, - PyBytes_AS_STRING(zd->dict_content), - Py_SIZE(zd->dict_content)); - } - else { - /* Impossible code path */ - PyErr_SetString(PyExc_SystemError, - "load_d_dict() impossible code path"); - return -1; - } - - /* Check error */ - if (ZSTD_isError(zstd_ret)) { - set_zstd_error(mod_state, ERR_LOAD_D_DICT, zstd_ret); - return -1; - } - return 0; } /* diff --git a/Modules/_zstd/zstddict.c b/Modules/_zstd/zstddict.c index 7df187a6fa69d7..aabb7ea0dc97a0 100644 --- a/Modules/_zstd/zstddict.c +++ b/Modules/_zstd/zstddict.c @@ -53,6 +53,7 @@ _zstd_ZstdDict_new_impl(PyTypeObject *type, PyObject *dict_content, self->dict_content = NULL; self->d_dict = NULL; self->dict_id = 0; + self->lock = (PyMutex){0}; /* ZSTD_CDict dict */ self->c_dicts = PyDict_New(); @@ -109,6 +110,8 @@ ZstdDict_dealloc(PyObject *ob) ZSTD_freeDDict(self->d_dict); } + assert(!PyMutex_IsLocked(&self->lock)); + /* Release dict_content after Free ZSTD_CDict/ZSTD_DDict instances */ Py_CLEAR(self->dict_content); Py_CLEAR(self->c_dicts); diff --git a/Modules/_zstd/zstddict.h b/Modules/_zstd/zstddict.h index e8a55a3670b869..dcba0f21852087 100644 --- a/Modules/_zstd/zstddict.h +++ b/Modules/_zstd/zstddict.h @@ -19,6 +19,9 @@ typedef struct { PyObject *dict_content; /* Dictionary id */ uint32_t dict_id; + + /* Lock to protect the digested dictionaries */ + PyMutex lock; } ZstdDict; #endif // !ZSTD_DICT_H From 2dbed4eb8baa05ca7fa06a87509e896a24aff62c Mon Sep 17 00:00:00 2001 From: Emma Harper Smith Date: Wed, 21 May 2025 09:48:45 -0400 Subject: [PATCH 12/18] Add tests for sharing all types of ZstdDict --- Lib/test/test_zstd.py | 36 ++++++++++++++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/Lib/test/test_zstd.py b/Lib/test/test_zstd.py index df0ffa185b5363..084f8f24fc009c 100644 --- a/Lib/test/test_zstd.py +++ b/Lib/test/test_zstd.py @@ -2509,11 +2509,43 @@ def test_compress_shared_dict(self): num_threads = 8 def run_method(b): - level = threading.get_ident() % 2 + level = threading.get_ident() % 4 # sync threads to increase chance of contention on # capsule storing dictionary levels b.wait() - ZstdCompressor(level=level, zstd_dict=TRAINED_DICT.as_digested_dict) + ZstdCompressor(level=level, + zstd_dict=TRAINED_DICT.as_digested_dict) + b.wait() + ZstdCompressor(level=level, + zstd_dict=TRAINED_DICT.as_undigested_dict) + b.wait() + ZstdCompressor(level=level, + zstd_dict=TRAINED_DICT.as_prefix) + threads = [] + + b = threading.Barrier(num_threads) + for i in range(num_threads): + thread = threading.Thread(target=run_method, args=(b,)) + + threads.append(thread) + + with threading_helper.start_threads(threads): + pass + + @threading_helper.reap_threads + @threading_helper.requires_working_threading() + def test_decompress_shared_dict(self): + num_threads = 8 + + def run_method(b): + # sync threads to increase chance of contention on + # decompression dictionary + b.wait() + ZstdDecompressor(zstd_dict=TRAINED_DICT.as_digested_dict) + b.wait() + ZstdDecompressor(zstd_dict=TRAINED_DICT.as_undigested_dict) + b.wait() + ZstdDecompressor(zstd_dict=TRAINED_DICT.as_prefix) threads = [] b = threading.Barrier(num_threads) From afdabd5408890315414873eac393a6ff6216549d Mon Sep 17 00:00:00 2001 From: Emma Harper Smith Date: Wed, 21 May 2025 09:50:09 -0400 Subject: [PATCH 13/18] Remove @critical_section --- Modules/_zstd/clinic/zstddict.c.h | 27 ++++----------------------- Modules/_zstd/zstddict.c | 9 +++------ 2 files changed, 7 insertions(+), 29 deletions(-) diff --git a/Modules/_zstd/clinic/zstddict.c.h b/Modules/_zstd/clinic/zstddict.c.h index 34e0e4b3ecfe72..aaa29e491bc1bb 100644 --- a/Modules/_zstd/clinic/zstddict.c.h +++ b/Modules/_zstd/clinic/zstddict.c.h @@ -6,7 +6,6 @@ preserve # include "pycore_gc.h" // PyGC_Head # include "pycore_runtime.h" // _Py_ID() #endif -#include "pycore_critical_section.h"// Py_BEGIN_CRITICAL_SECTION() #include "pycore_modsupport.h" // _PyArg_UnpackKeywords() PyDoc_STRVAR(_zstd_ZstdDict_new__doc__, @@ -118,13 +117,7 @@ _zstd_ZstdDict_as_digested_dict_get_impl(ZstdDict *self); static PyObject * _zstd_ZstdDict_as_digested_dict_get(PyObject *self, void *Py_UNUSED(context)) { - PyObject *return_value = NULL; - - Py_BEGIN_CRITICAL_SECTION(self); - return_value = _zstd_ZstdDict_as_digested_dict_get_impl((ZstdDict *)self); - Py_END_CRITICAL_SECTION(); - - return return_value; + return _zstd_ZstdDict_as_digested_dict_get_impl((ZstdDict *)self); } PyDoc_STRVAR(_zstd_ZstdDict_as_undigested_dict__doc__, @@ -156,13 +149,7 @@ _zstd_ZstdDict_as_undigested_dict_get_impl(ZstdDict *self); static PyObject * _zstd_ZstdDict_as_undigested_dict_get(PyObject *self, void *Py_UNUSED(context)) { - PyObject *return_value = NULL; - - Py_BEGIN_CRITICAL_SECTION(self); - return_value = _zstd_ZstdDict_as_undigested_dict_get_impl((ZstdDict *)self); - Py_END_CRITICAL_SECTION(); - - return return_value; + return _zstd_ZstdDict_as_undigested_dict_get_impl((ZstdDict *)self); } PyDoc_STRVAR(_zstd_ZstdDict_as_prefix__doc__, @@ -194,12 +181,6 @@ _zstd_ZstdDict_as_prefix_get_impl(ZstdDict *self); static PyObject * _zstd_ZstdDict_as_prefix_get(PyObject *self, void *Py_UNUSED(context)) { - PyObject *return_value = NULL; - - Py_BEGIN_CRITICAL_SECTION(self); - return_value = _zstd_ZstdDict_as_prefix_get_impl((ZstdDict *)self); - Py_END_CRITICAL_SECTION(); - - return return_value; + return _zstd_ZstdDict_as_prefix_get_impl((ZstdDict *)self); } -/*[clinic end generated code: output=bfb31c1187477afd input=a9049054013a1b77]*/ +/*[clinic end generated code: output=8692eabee4e0d1fe input=a9049054013a1b77]*/ diff --git a/Modules/_zstd/zstddict.c b/Modules/_zstd/zstddict.c index aabb7ea0dc97a0..b97f15ba1687ae 100644 --- a/Modules/_zstd/zstddict.c +++ b/Modules/_zstd/zstddict.c @@ -146,7 +146,6 @@ static PyMemberDef ZstdDict_members[] = { }; /*[clinic input] -@critical_section @getter _zstd.ZstdDict.as_digested_dict @@ -163,13 +162,12 @@ Pass this attribute as zstd_dict argument: compress(dat, zstd_dict=zd.as_digeste static PyObject * _zstd_ZstdDict_as_digested_dict_get_impl(ZstdDict *self) -/*[clinic end generated code: output=09b086e7a7320dbb input=585448c79f31f74a]*/ +/*[clinic end generated code: output=09b086e7a7320dbb input=10cd2b6165931b77]*/ { return Py_BuildValue("Oi", self, DICT_TYPE_DIGESTED); } /*[clinic input] -@critical_section @getter _zstd.ZstdDict.as_undigested_dict @@ -184,13 +182,12 @@ Pass this attribute as zstd_dict argument: compress(dat, zstd_dict=zd.as_undiges static PyObject * _zstd_ZstdDict_as_undigested_dict_get_impl(ZstdDict *self) -/*[clinic end generated code: output=43c7a989e6d4253a input=022b0829ffb1c220]*/ +/*[clinic end generated code: output=43c7a989e6d4253a input=11e5f5df690a85b4]*/ { return Py_BuildValue("Oi", self, DICT_TYPE_UNDIGESTED); } /*[clinic input] -@critical_section @getter _zstd.ZstdDict.as_prefix @@ -205,7 +202,7 @@ Pass this attribute as zstd_dict argument: compress(dat, zstd_dict=zd.as_prefix) static PyObject * _zstd_ZstdDict_as_prefix_get_impl(ZstdDict *self) -/*[clinic end generated code: output=6f7130c356595a16 input=09fb82a6a5407e87]*/ +/*[clinic end generated code: output=6f7130c356595a16 input=b028e0ae6ec4292b]*/ { return Py_BuildValue("Oi", self, DICT_TYPE_PREFIX); } From d8d48c2deb75374d754806a698c0a995f10c1d41 Mon Sep 17 00:00:00 2001 From: Emma Harper Smith Date: Wed, 21 May 2025 09:52:38 -0400 Subject: [PATCH 14/18] Add missing include --- Modules/_zstd/zstddict.c | 1 + 1 file changed, 1 insertion(+) diff --git a/Modules/_zstd/zstddict.c b/Modules/_zstd/zstddict.c index b97f15ba1687ae..39828c9b36b5c2 100644 --- a/Modules/_zstd/zstddict.c +++ b/Modules/_zstd/zstddict.c @@ -17,6 +17,7 @@ class _zstd.ZstdDict "ZstdDict *" "&zstd_dict_type_spec" #include "_zstdmodule.h" #include "zstddict.h" #include "clinic/zstddict.c.h" +#include "internal/pycore_lock.h" // PyMutex_IsLocked #include // ZSTD_freeDDict(), ZSTD_getDictID_fromDict() From d142aa849f56972b20a8df1e588b5dd540d7276c Mon Sep 17 00:00:00 2001 From: Emma Harper Smith Date: Wed, 21 May 2025 16:52:49 -0400 Subject: [PATCH 15/18] Updates based on Sam's review - Assert locks are held in `_get_(C/D)Dict` - Update self->d_dict outside `Py_BEGIN_ALLOW_THREADS` - removed critical section AC I missed --- Modules/_zstd/clinic/decompressor.c.h | 11 ++--------- Modules/_zstd/compressor.c | 10 +++++++--- Modules/_zstd/decompressor.c | 13 +++++-------- 3 files changed, 14 insertions(+), 20 deletions(-) diff --git a/Modules/_zstd/clinic/decompressor.c.h b/Modules/_zstd/clinic/decompressor.c.h index 4ecb19e9bde6ed..c6fdae74ab0447 100644 --- a/Modules/_zstd/clinic/decompressor.c.h +++ b/Modules/_zstd/clinic/decompressor.c.h @@ -7,7 +7,6 @@ preserve # include "pycore_runtime.h" // _Py_ID() #endif #include "pycore_abstract.h" // _PyNumber_Index() -#include "pycore_critical_section.h"// Py_BEGIN_CRITICAL_SECTION() #include "pycore_modsupport.h" // _PyArg_UnpackKeywords() PyDoc_STRVAR(_zstd_ZstdDecompressor_new__doc__, @@ -114,13 +113,7 @@ _zstd_ZstdDecompressor_unused_data_get_impl(ZstdDecompressor *self); static PyObject * _zstd_ZstdDecompressor_unused_data_get(PyObject *self, void *Py_UNUSED(context)) { - PyObject *return_value = NULL; - - Py_BEGIN_CRITICAL_SECTION(self); - return_value = _zstd_ZstdDecompressor_unused_data_get_impl((ZstdDecompressor *)self); - Py_END_CRITICAL_SECTION(); - - return return_value; + return _zstd_ZstdDecompressor_unused_data_get_impl((ZstdDecompressor *)self); } PyDoc_STRVAR(_zstd_ZstdDecompressor_decompress__doc__, @@ -227,4 +220,4 @@ _zstd_ZstdDecompressor_decompress(PyObject *self, PyObject *const *args, Py_ssiz return return_value; } -/*[clinic end generated code: output=7a4d278f9244e684 input=a9049054013a1b77]*/ +/*[clinic end generated code: output=30c12ef047027ede input=a9049054013a1b77]*/ diff --git a/Modules/_zstd/compressor.c b/Modules/_zstd/compressor.c index 7e460c4a77ce86..9f216f36872866 100644 --- a/Modules/_zstd/compressor.c +++ b/Modules/_zstd/compressor.c @@ -153,6 +153,7 @@ capsule_free_cdict(PyObject *capsule) ZSTD_CDict * _get_CDict(ZstdDict *self, int compressionLevel) { + assert(PyMutex_IsLocked(&self->lock)); PyObject *level = NULL; PyObject *capsule; ZSTD_CDict *cdict; @@ -197,11 +198,14 @@ _get_CDict(ZstdDict *self, int compressionLevel) } /* Add PyCapsule object to self->c_dicts if not already inserted */ - if (PyDict_SetItem(self->c_dicts, level, capsule) < 0) { - Py_DECREF(capsule); + PyObject *capsule_dict; + int ret = PyDict_SetDefaultRef(self->c_dicts, level, capsule, + &capsule_dict); + Py_XDECREF(capsule_dict); + Py_DECREF(capsule); + if (ret < 0) { goto error; } - Py_DECREF(capsule); } else { /* ZSTD_CDict instance already exists */ diff --git a/Modules/_zstd/decompressor.c b/Modules/_zstd/decompressor.c index 73f8fb3e5176a9..65367ec1844410 100644 --- a/Modules/_zstd/decompressor.c +++ b/Modules/_zstd/decompressor.c @@ -58,6 +58,7 @@ typedef struct { static inline ZSTD_DDict * _get_DDict(ZstdDict *self) { + assert(PyMutex_IsLocked(&self->lock)); ZSTD_DDict *ret; /* Already created */ @@ -70,9 +71,9 @@ _get_DDict(ZstdDict *self) char *dict_buffer = PyBytes_AS_STRING(self->dict_content); Py_ssize_t dict_len = Py_SIZE(self->dict_content); Py_BEGIN_ALLOW_THREADS - self->d_dict = ZSTD_createDDict(dict_buffer, - dict_len); + ret = ZSTD_createDDict(dict_buffer, dict_len); Py_END_ALLOW_THREADS + self->d_dict = ret; if (self->d_dict == NULL) { _zstd_state* const mod_state = PyType_GetModuleState(Py_TYPE(self)); @@ -84,10 +85,7 @@ _get_DDict(ZstdDict *self) } } - /* Don't lose any exception */ - ret = self->d_dict; - - return ret; + return self->d_dict; } /* Set decompression parameters to decompression context */ @@ -629,7 +627,6 @@ ZstdDecompressor_dealloc(PyObject *ob) } /*[clinic input] -@critical_section @getter _zstd.ZstdDecompressor.unused_data @@ -641,7 +638,7 @@ decompressed, unused input data after the frame. Otherwise this will be b''. static PyObject * _zstd_ZstdDecompressor_unused_data_get_impl(ZstdDecompressor *self) -/*[clinic end generated code: output=f3a20940f11b6b09 input=5233800bef00df04]*/ +/*[clinic end generated code: output=f3a20940f11b6b09 input=54d41ecd681a3444]*/ { PyObject *ret; From 43d3dea9e9b20a4d6918608b09a4bc05747b95c3 Mon Sep 17 00:00:00 2001 From: Emma Smith Date: Thu, 22 May 2025 14:27:32 -0700 Subject: [PATCH 16/18] Pass NULL to PyDict_SetDefaultRef Co-authored-by: Kumar Aditya --- Modules/_zstd/compressor.c | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/Modules/_zstd/compressor.c b/Modules/_zstd/compressor.c index 9f216f36872866..f38f436d9062c2 100644 --- a/Modules/_zstd/compressor.c +++ b/Modules/_zstd/compressor.c @@ -198,10 +198,7 @@ _get_CDict(ZstdDict *self, int compressionLevel) } /* Add PyCapsule object to self->c_dicts if not already inserted */ - PyObject *capsule_dict; - int ret = PyDict_SetDefaultRef(self->c_dicts, level, capsule, - &capsule_dict); - Py_XDECREF(capsule_dict); + int ret = PyDict_SetDefaultRef(self->c_dicts, level, capsule, NULL); Py_DECREF(capsule); if (ret < 0) { goto error; From 93dec23d0e408df618c2c942b0181edd6bcd6f8d Mon Sep 17 00:00:00 2001 From: Emma Harper Smith Date: Thu, 22 May 2025 17:22:56 -0700 Subject: [PATCH 17/18] Put locks around relevant functions and use PyDict_GetItemRef --- Modules/_zstd/compressor.c | 20 ++++++++++++-------- Modules/_zstd/decompressor.c | 2 ++ 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/Modules/_zstd/compressor.c b/Modules/_zstd/compressor.c index f38f436d9062c2..de207636477ac3 100644 --- a/Modules/_zstd/compressor.c +++ b/Modules/_zstd/compressor.c @@ -155,8 +155,9 @@ _get_CDict(ZstdDict *self, int compressionLevel) { assert(PyMutex_IsLocked(&self->lock)); PyObject *level = NULL; - PyObject *capsule; + PyObject *capsule = NULL; ZSTD_CDict *cdict; + int ret; /* int level object */ @@ -166,11 +167,12 @@ _get_CDict(ZstdDict *self, int compressionLevel) } /* Get PyCapsule object from self->c_dicts */ - capsule = PyDict_GetItemWithError(self->c_dicts, level); + ret = PyDict_GetItemRef(self->c_dicts, level, &capsule); + if (ret < 0) { + Py_XDECREF(capsule); + goto error; + } if (capsule == NULL) { - if (PyErr_Occurred()) { - goto error; - } /* Create ZSTD_CDict instance */ char *dict_buffer = PyBytes_AS_STRING(self->dict_content); Py_ssize_t dict_len = Py_SIZE(self->dict_content); @@ -197,9 +199,8 @@ _get_CDict(ZstdDict *self, int compressionLevel) goto error; } - /* Add PyCapsule object to self->c_dicts if not already inserted */ - int ret = PyDict_SetDefaultRef(self->c_dicts, level, capsule, NULL); - Py_DECREF(capsule); + /* Add PyCapsule object to self->c_dicts */ + ret = PyDict_SetItem(self->c_dicts, level, capsule); if (ret < 0) { goto error; } @@ -214,6 +215,7 @@ _get_CDict(ZstdDict *self, int compressionLevel) cdict = NULL; success: Py_XDECREF(level); + Py_XDECREF(capsule); return cdict; } @@ -422,6 +424,7 @@ static PyObject * compress_lock_held(ZstdCompressor *self, Py_buffer *data, ZSTD_EndDirective end_directive) { + assert(PyMutex_IsLocked(&self->lock)); ZSTD_inBuffer in; ZSTD_outBuffer out; _BlocksOutputBuffer buffer = {.list = NULL}; @@ -504,6 +507,7 @@ mt_continue_should_break(ZSTD_inBuffer *in, ZSTD_outBuffer *out) static PyObject * compress_mt_continue_lock_held(ZstdCompressor *self, Py_buffer *data) { + assert(PyMutex_IsLocked(&self->lock)); ZSTD_inBuffer in; ZSTD_outBuffer out; _BlocksOutputBuffer buffer = {.list = NULL}; diff --git a/Modules/_zstd/decompressor.c b/Modules/_zstd/decompressor.c index 65367ec1844410..e299f73b071353 100644 --- a/Modules/_zstd/decompressor.c +++ b/Modules/_zstd/decompressor.c @@ -343,6 +343,7 @@ decompress_lock_held(ZstdDecompressor *self, ZSTD_inBuffer *in, static void decompressor_reset_session_lock_held(ZstdDecompressor *self) { + assert(PyMutex_IsLocked(&self->lock)); /* Reset variables */ self->in_begin = 0; @@ -362,6 +363,7 @@ static PyObject * stream_decompress_lock_held(ZstdDecompressor *self, Py_buffer *data, Py_ssize_t max_length) { + assert(PyMutex_IsLocked(&self->lock)); ZSTD_inBuffer in; PyObject *ret = NULL; int use_input_buffer; From 69a755bd828ed2729a3a622d3a92c353de90b184 Mon Sep 17 00:00:00 2001 From: Emma Harper Smith Date: Thu, 22 May 2025 17:41:48 -0700 Subject: [PATCH 18/18] Remove unused decref --- Modules/_zstd/compressor.c | 1 - 1 file changed, 1 deletion(-) diff --git a/Modules/_zstd/compressor.c b/Modules/_zstd/compressor.c index de207636477ac3..8f934858ef784f 100644 --- a/Modules/_zstd/compressor.c +++ b/Modules/_zstd/compressor.c @@ -169,7 +169,6 @@ _get_CDict(ZstdDict *self, int compressionLevel) /* Get PyCapsule object from self->c_dicts */ ret = PyDict_GetItemRef(self->c_dicts, level, &capsule); if (ret < 0) { - Py_XDECREF(capsule); goto error; } if (capsule == NULL) {