Skip to content

Update gzip from 3.13.5 #5912

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 73 additions & 56 deletions Lib/gzip.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@

# based on Andrew Kuchling's minigzip.py distributed with the zlib module

import struct, sys, time, os
import zlib
import _compression
import builtins
import io
import _compression
import os
import struct
import sys
import time
import weakref
import zlib

__all__ = ["BadGzipFile", "GzipFile", "open", "compress", "decompress"]

Expand Down Expand Up @@ -125,10 +129,13 @@ class BadGzipFile(OSError):
class _WriteBufferStream(io.RawIOBase):
"""Minimal object to pass WriteBuffer flushes into GzipFile"""
def __init__(self, gzip_file):
self.gzip_file = gzip_file
self.gzip_file = weakref.ref(gzip_file)

def write(self, data):
return self.gzip_file._write_raw(data)
gzip_file = self.gzip_file()
if gzip_file is None:
raise RuntimeError("lost gzip_file")
return gzip_file._write_raw(data)

def seekable(self):
return False
Expand Down Expand Up @@ -190,51 +197,58 @@ def __init__(self, filename=None, mode=None,
raise ValueError("Invalid mode: {!r}".format(mode))
if mode and 'b' not in mode:
mode += 'b'
if fileobj is None:
fileobj = self.myfileobj = builtins.open(filename, mode or 'rb')
if filename is None:
filename = getattr(fileobj, 'name', '')
if not isinstance(filename, (str, bytes)):
filename = ''
else:
filename = os.fspath(filename)
origmode = mode
if mode is None:
mode = getattr(fileobj, 'mode', 'rb')


if mode.startswith('r'):
self.mode = READ
raw = _GzipReader(fileobj)
self._buffer = io.BufferedReader(raw)
self.name = filename

elif mode.startswith(('w', 'a', 'x')):
if origmode is None:
import warnings
warnings.warn(
"GzipFile was opened for writing, but this will "
"change in future Python releases. "
"Specify the mode argument for opening it for writing.",
FutureWarning, 2)
self.mode = WRITE
self._init_write(filename)
self.compress = zlib.compressobj(compresslevel,
zlib.DEFLATED,
-zlib.MAX_WBITS,
zlib.DEF_MEM_LEVEL,
0)
self._write_mtime = mtime
self._buffer_size = _WRITE_BUFFER_SIZE
self._buffer = io.BufferedWriter(_WriteBufferStream(self),
buffer_size=self._buffer_size)
else:
raise ValueError("Invalid mode: {!r}".format(mode))

self.fileobj = fileobj
try:
if fileobj is None:
fileobj = self.myfileobj = builtins.open(filename, mode or 'rb')
if filename is None:
filename = getattr(fileobj, 'name', '')
if not isinstance(filename, (str, bytes)):
filename = ''
else:
filename = os.fspath(filename)
origmode = mode
if mode is None:
mode = getattr(fileobj, 'mode', 'rb')


if mode.startswith('r'):
self.mode = READ
raw = _GzipReader(fileobj)
self._buffer = io.BufferedReader(raw)
self.name = filename

elif mode.startswith(('w', 'a', 'x')):
if origmode is None:
import warnings
warnings.warn(
"GzipFile was opened for writing, but this will "
"change in future Python releases. "
"Specify the mode argument for opening it for writing.",
FutureWarning, 2)
self.mode = WRITE
self._init_write(filename)
self.compress = zlib.compressobj(compresslevel,
zlib.DEFLATED,
-zlib.MAX_WBITS,
zlib.DEF_MEM_LEVEL,
0)
self._write_mtime = mtime
self._buffer_size = _WRITE_BUFFER_SIZE
self._buffer = io.BufferedWriter(_WriteBufferStream(self),
buffer_size=self._buffer_size)
else:
raise ValueError("Invalid mode: {!r}".format(mode))

if self.mode == WRITE:
self._write_gzip_header(compresslevel)
self.fileobj = fileobj

if self.mode == WRITE:
self._write_gzip_header(compresslevel)
except:
# Avoid a ResourceWarning if the write fails,
# eg read-only file or KeyboardInterrupt
self._close()
raise

@property
def mtime(self):
Expand Down Expand Up @@ -363,11 +377,14 @@ def close(self):
elif self.mode == READ:
self._buffer.close()
finally:
self.fileobj = None
myfileobj = self.myfileobj
if myfileobj:
self.myfileobj = None
myfileobj.close()
self._close()

def _close(self):
self.fileobj = None
myfileobj = self.myfileobj
if myfileobj is not None:
self.myfileobj = None
myfileobj.close()

def flush(self,zlib_mode=zlib.Z_SYNC_FLUSH):
self._check_not_closed()
Expand Down Expand Up @@ -580,12 +597,12 @@ def _rewind(self):
self._new_member = True


def compress(data, compresslevel=_COMPRESS_LEVEL_BEST, *, mtime=0):
def compress(data, compresslevel=_COMPRESS_LEVEL_BEST, *, mtime=None):
"""Compress data in one shot and return the compressed string.

compresslevel sets the compression level in range of 0-9.
mtime can be used to set the modification time.
The modification time is set to 0 by default, for reproducibility.
mtime can be used to set the modification time. The modification time is
set to the current time by default.
"""
# Wbits=31 automatically includes a gzip header and trailer.
gzip_data = zlib.compress(data, level=compresslevel, wbits=31)
Expand Down
24 changes: 13 additions & 11 deletions Lib/test/test_gzip.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@

import array
import functools
import gc
import io
import os
import struct
import sys
import unittest
from subprocess import PIPE, Popen
from test.support import catch_unraisable_exception
from test.support import import_helper
from test.support import os_helper
from test.support import _4G, bigmemtest, requires_subprocess
Expand Down Expand Up @@ -713,17 +715,6 @@ def test_compress_mtime(self):
f.read(1) # to set mtime attribute
self.assertEqual(f.mtime, mtime)

def test_compress_mtime_default(self):
# test for gh-125260
datac = gzip.compress(data1, mtime=0)
datac2 = gzip.compress(data1)
self.assertEqual(datac, datac2)
datac3 = gzip.compress(data1, mtime=None)
self.assertNotEqual(datac, datac3)
with gzip.GzipFile(fileobj=io.BytesIO(datac3), mode="rb") as f:
f.read(1) # to set mtime attribute
self.assertGreater(f.mtime, 1)

def test_compress_correct_level(self):
for mtime in (0, 42):
with self.subTest(mtime=mtime):
Expand Down Expand Up @@ -859,6 +850,17 @@ def test_write_seek_write(self):
self.assertEqual(gzip.decompress(data), message * 2)


def test_refloop_unraisable(self):
# Ensure a GzipFile referring to a temporary fileobj deletes cleanly.
# Previously an unraisable exception would occur on close because the
# fileobj would be closed before the GzipFile as the result of a
# reference loop. See issue gh-129726
with catch_unraisable_exception() as cm:
gzip.GzipFile(fileobj=io.BytesIO(), mode="w")
gc.collect()
self.assertIsNone(cm.unraisable)


class TestOpen(BaseTest):
def test_binary_modes(self):
uncompressed = data1 * 50
Expand Down
Loading