From 220cf99b259cb2db9c97edfcd5266137a1c4f4ca Mon Sep 17 00:00:00 2001 From: ShaharNaveh <50263213+ShaharNaveh@users.noreply.github.com> Date: Tue, 8 Jul 2025 13:31:31 +0300 Subject: [PATCH] Update gzip from 3.13.5 --- Lib/gzip.py | 129 ++++++++++++++++++++++++------------------ Lib/test/test_gzip.py | 24 ++++---- 2 files changed, 86 insertions(+), 67 deletions(-) diff --git a/Lib/gzip.py b/Lib/gzip.py index 1a3c82ce7e..a550c20a7a 100644 --- a/Lib/gzip.py +++ b/Lib/gzip.py @@ -5,11 +5,15 @@ # based on Andrew Kuchling's minigzip.py distributed with the zlib module -import struct, sys, time, os -import zlib +import _compression import builtins import io -import _compression +import os +import struct +import sys +import time +import weakref +import zlib __all__ = ["BadGzipFile", "GzipFile", "open", "compress", "decompress"] @@ -125,10 +129,13 @@ class BadGzipFile(OSError): class _WriteBufferStream(io.RawIOBase): """Minimal object to pass WriteBuffer flushes into GzipFile""" def __init__(self, gzip_file): - self.gzip_file = gzip_file + self.gzip_file = weakref.ref(gzip_file) def write(self, data): - return self.gzip_file._write_raw(data) + gzip_file = self.gzip_file() + if gzip_file is None: + raise RuntimeError("lost gzip_file") + return gzip_file._write_raw(data) def seekable(self): return False @@ -190,51 +197,58 @@ def __init__(self, filename=None, mode=None, raise ValueError("Invalid mode: {!r}".format(mode)) if mode and 'b' not in mode: mode += 'b' - if fileobj is None: - fileobj = self.myfileobj = builtins.open(filename, mode or 'rb') - if filename is None: - filename = getattr(fileobj, 'name', '') - if not isinstance(filename, (str, bytes)): - filename = '' - else: - filename = os.fspath(filename) - origmode = mode - if mode is None: - mode = getattr(fileobj, 'mode', 'rb') - - - if mode.startswith('r'): - self.mode = READ - raw = _GzipReader(fileobj) - self._buffer = io.BufferedReader(raw) - self.name = filename - - elif mode.startswith(('w', 'a', 'x')): - if origmode is None: - import warnings - warnings.warn( - "GzipFile was opened for writing, but this will " - "change in future Python releases. " - "Specify the mode argument for opening it for writing.", - FutureWarning, 2) - self.mode = WRITE - self._init_write(filename) - self.compress = zlib.compressobj(compresslevel, - zlib.DEFLATED, - -zlib.MAX_WBITS, - zlib.DEF_MEM_LEVEL, - 0) - self._write_mtime = mtime - self._buffer_size = _WRITE_BUFFER_SIZE - self._buffer = io.BufferedWriter(_WriteBufferStream(self), - buffer_size=self._buffer_size) - else: - raise ValueError("Invalid mode: {!r}".format(mode)) - self.fileobj = fileobj + try: + if fileobj is None: + fileobj = self.myfileobj = builtins.open(filename, mode or 'rb') + if filename is None: + filename = getattr(fileobj, 'name', '') + if not isinstance(filename, (str, bytes)): + filename = '' + else: + filename = os.fspath(filename) + origmode = mode + if mode is None: + mode = getattr(fileobj, 'mode', 'rb') + + + if mode.startswith('r'): + self.mode = READ + raw = _GzipReader(fileobj) + self._buffer = io.BufferedReader(raw) + self.name = filename + + elif mode.startswith(('w', 'a', 'x')): + if origmode is None: + import warnings + warnings.warn( + "GzipFile was opened for writing, but this will " + "change in future Python releases. " + "Specify the mode argument for opening it for writing.", + FutureWarning, 2) + self.mode = WRITE + self._init_write(filename) + self.compress = zlib.compressobj(compresslevel, + zlib.DEFLATED, + -zlib.MAX_WBITS, + zlib.DEF_MEM_LEVEL, + 0) + self._write_mtime = mtime + self._buffer_size = _WRITE_BUFFER_SIZE + self._buffer = io.BufferedWriter(_WriteBufferStream(self), + buffer_size=self._buffer_size) + else: + raise ValueError("Invalid mode: {!r}".format(mode)) - if self.mode == WRITE: - self._write_gzip_header(compresslevel) + self.fileobj = fileobj + + if self.mode == WRITE: + self._write_gzip_header(compresslevel) + except: + # Avoid a ResourceWarning if the write fails, + # eg read-only file or KeyboardInterrupt + self._close() + raise @property def mtime(self): @@ -363,11 +377,14 @@ def close(self): elif self.mode == READ: self._buffer.close() finally: - self.fileobj = None - myfileobj = self.myfileobj - if myfileobj: - self.myfileobj = None - myfileobj.close() + self._close() + + def _close(self): + self.fileobj = None + myfileobj = self.myfileobj + if myfileobj is not None: + self.myfileobj = None + myfileobj.close() def flush(self,zlib_mode=zlib.Z_SYNC_FLUSH): self._check_not_closed() @@ -580,12 +597,12 @@ def _rewind(self): self._new_member = True -def compress(data, compresslevel=_COMPRESS_LEVEL_BEST, *, mtime=0): +def compress(data, compresslevel=_COMPRESS_LEVEL_BEST, *, mtime=None): """Compress data in one shot and return the compressed string. compresslevel sets the compression level in range of 0-9. - mtime can be used to set the modification time. - The modification time is set to 0 by default, for reproducibility. + mtime can be used to set the modification time. The modification time is + set to the current time by default. """ # Wbits=31 automatically includes a gzip header and trailer. gzip_data = zlib.compress(data, level=compresslevel, wbits=31) diff --git a/Lib/test/test_gzip.py b/Lib/test/test_gzip.py index 42bb7d6984..b0d9613cdb 100644 --- a/Lib/test/test_gzip.py +++ b/Lib/test/test_gzip.py @@ -3,12 +3,14 @@ import array import functools +import gc import io import os import struct import sys import unittest from subprocess import PIPE, Popen +from test.support import catch_unraisable_exception from test.support import import_helper from test.support import os_helper from test.support import _4G, bigmemtest, requires_subprocess @@ -713,17 +715,6 @@ def test_compress_mtime(self): f.read(1) # to set mtime attribute self.assertEqual(f.mtime, mtime) - def test_compress_mtime_default(self): - # test for gh-125260 - datac = gzip.compress(data1, mtime=0) - datac2 = gzip.compress(data1) - self.assertEqual(datac, datac2) - datac3 = gzip.compress(data1, mtime=None) - self.assertNotEqual(datac, datac3) - with gzip.GzipFile(fileobj=io.BytesIO(datac3), mode="rb") as f: - f.read(1) # to set mtime attribute - self.assertGreater(f.mtime, 1) - def test_compress_correct_level(self): for mtime in (0, 42): with self.subTest(mtime=mtime): @@ -859,6 +850,17 @@ def test_write_seek_write(self): self.assertEqual(gzip.decompress(data), message * 2) + def test_refloop_unraisable(self): + # Ensure a GzipFile referring to a temporary fileobj deletes cleanly. + # Previously an unraisable exception would occur on close because the + # fileobj would be closed before the GzipFile as the result of a + # reference loop. See issue gh-129726 + with catch_unraisable_exception() as cm: + gzip.GzipFile(fileobj=io.BytesIO(), mode="w") + gc.collect() + self.assertIsNone(cm.unraisable) + + class TestOpen(BaseTest): def test_binary_modes(self): uncompressed = data1 * 50