Skip to content

gh-113924: Add copy from zipfile without decompress/recompressing data #125718

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
3405a33
gh-113924: add tests for existing exceptions in zipfile stl (#113924)
sunrisesarsaparilla Oct 18, 2024
7102f29
gh-113924: decouple ZipFile write mode from read mode (#113924)
sunrisesarsaparilla Oct 19, 2024
be07ad7
gh-113924: extract method for ZipFile reading logic (#113924)
sunrisesarsaparilla Oct 18, 2024
8bdfab4
gh-113924: extract method to raise if cannot write to zipfile (#113924)
sunrisesarsaparilla Oct 18, 2024
437917a
gh-113924: extract method to setup writing to zipfile (#113924)
sunrisesarsaparilla Oct 18, 2024
36a4a33
gh-113924: make ZipFile encrypted files reusable in tests (#113924)
sunrisesarsaparilla Oct 18, 2024
f95c9a8
gh-113924: add method and tests to copy file from ZipFile (#113924)
sunrisesarsaparilla Oct 18, 2024
7e9501e
gh-113924: add method and tests to copy many files from ZipFile (#113…
sunrisesarsaparilla Oct 18, 2024
ea8e00f
gh-113924: add method and tests to copy all files from ZipFile (#113924)
sunrisesarsaparilla Oct 18, 2024
6c5cb7e
add entry to Misc/NEWS.d
sunrisesarsaparilla Oct 19, 2024
2da2c4b
remove copy_file and copy_all_files and repurpose copy_files
sunrisesarsaparilla Oct 21, 2024
871f641
update NEWS.d
sunrisesarsaparilla Oct 21, 2024
1a58aac
add documentation for copy_files
sunrisesarsaparilla Oct 21, 2024
86a0b82
remove unused method
sunrisesarsaparilla Oct 21, 2024
85f1c15
Merge branch 'python:main' into zipfile/write/copy
sunrisesarsaparilla Feb 10, 2025
c31ad76
Merge branch 'python:main' into zipfile/write/copy
sunrisesarsaparilla Apr 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 205 additions & 10 deletions Lib/test/test_zipfile/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import unittest
import unittest.mock as mock
import zipfile
import filecmp


from tempfile import TemporaryFile
Expand Down Expand Up @@ -610,14 +611,15 @@ def test_io_on_closed_zipextfile(self):
self.assertRaises(ValueError, fid.seek, 0)
self.assertRaises(ValueError, fid.tell)

def test_write_to_readonly(self):
"""Check that trying to call write() on a readonly ZipFile object
def test_writing_to_readonly(self):
"""Check that trying to write to a readonly ZipFile object
raises a ValueError."""
with zipfile.ZipFile(TESTFN2, mode="w") as zipfp:
zipfp.writestr("somefile.txt", "bogus")

with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
self.assertRaises(ValueError, zipfp.write, TESTFN)
self.assertRaises(ValueError, zipfp.writestr, TESTFN, "data")

with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
with self.assertRaises(ValueError):
Expand Down Expand Up @@ -2057,6 +2059,7 @@ def test_closed_zip_raises_ValueError(self):
# and report that the first file in the archive was corrupt.
self.assertRaises(ValueError, zipf.read, "foo.txt")
self.assertRaises(ValueError, zipf.open, "foo.txt")
self.assertRaises(ValueError, zipf.open, "foo.txt", "w")
self.assertRaises(ValueError, zipf.testzip)
self.assertRaises(ValueError, zipf.writestr, "bogus.txt", "bogus")
with open(TESTFN, 'w', encoding='utf-8') as f:
Expand Down Expand Up @@ -2307,6 +2310,16 @@ def test_zipfile_with_short_extra_field(self):
# testzip returns the name of the first corrupt file, or None
self.assertIsNone(zipf.testzip())

def test_open_for_write_issues_exception_when_pwd_provided(self):
with zipfile.ZipFile(TESTFN2, 'w') as zipf:
with self.assertRaises(ValueError):
zipf.open("foo.txt", mode='w', pwd="password")

def test_open_for_write_issues_exception_when_force_zip_not_allowed(self):
with zipfile.ZipFile(TESTFN2, 'w', allowZip64=False) as zipf:
with self.assertRaises(ValueError):
zipf.open("foo.txt", mode='w', force_zip64=True)

def test_open_conflicting_handles(self):
# It's only possible to open one writable file handle at a time
msg1 = b"It's fun to charter an accountant!"
Expand All @@ -2320,6 +2333,8 @@ def test_open_conflicting_handles(self):
zipf.open('handle', mode='w')
with self.assertRaises(ValueError):
zipf.open('foo', mode='r')
with self.assertRaises(ValueError):
zipf.read('foo')
with self.assertRaises(ValueError):
zipf.writestr('str', 'abcde')
with self.assertRaises(ValueError):
Expand Down Expand Up @@ -2670,20 +2685,22 @@ class LzmaBadCrcTests(AbstractBadCrcTests, unittest.TestCase):
b'\x00>\x00\x00\x00\x00\x00')


class DecryptionTests(unittest.TestCase):
"""Check that ZIP decryption works. Since the library does not
support encryption at the moment, we use a pre-generated encrypted
ZIP file."""
class EncryptedFiles:
""" Since the library does not support encryption at the moment,
we use pre-generated encrypted ZIP files."""

data = (
encrypted_zip1_data = (
b'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00'
b'\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y'
b'\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl'
b'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00'
b'\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81'
b'\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00'
b'\x00\x00L\x00\x00\x00\x00\x00' )
data2 = (

zip1_filename = "test.txt"

encrypted_zip2_data = (
b'PK\x03\x04\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02'
b'\x00\x00\x04\x00\x15\x00zeroUT\t\x00\x03\xd6\x8b\x92G\xda\x8b\x92GUx\x04'
b'\x00\xe8\x03\xe8\x03\xc7<M\xb5a\xceX\xa3Y&\x8b{oE\xd7\x9d\x8c\x98\x02\xc0'
Expand All @@ -2693,15 +2710,19 @@ class DecryptionTests(unittest.TestCase):
b'roUT\x05\x00\x03\xd6\x8b\x92GUx\x00\x00PK\x05\x06\x00\x00\x00\x00\x01'
b'\x00\x01\x00?\x00\x00\x00[\x00\x00\x00\x00\x00' )


class DecryptionTests(EncryptedFiles, unittest.TestCase):
"""Check that ZIP decryption works."""

plain = b'zipfile.py encryption test'
plain2 = b'\x00'*512

def setUp(self):
with open(TESTFN, "wb") as fp:
fp.write(self.data)
fp.write(self.encrypted_zip1_data)
self.zip = zipfile.ZipFile(TESTFN, "r")
with open(TESTFN2, "wb") as fp:
fp.write(self.data2)
fp.write(self.encrypted_zip2_data)
self.zip2 = zipfile.ZipFile(TESTFN2, "r")

def tearDown(self):
Expand Down Expand Up @@ -2786,6 +2807,180 @@ def test_seek_tell(self):
fp.read()


class AbstractCopyFileTests(EncryptedFiles):
@classmethod
def write_small_file(cls, destination):
destination.writestr(cls.small_file, cls.small_data,
compress_type=cls.compression)

@classmethod
def write_large_file(cls, destination):
destination.writestr(cls.large_file, cls.large_data,
compress_type=cls.compression)

@classmethod
def setUpClass(cls):
cls.small_zip = TESTFN + "_small.zip"
cls.large_zip = TESTFN + "_large.zip"
cls.small_large_zip = TESTFN + "_small_large.zip"
cls.emtpy_dir_zip = TESTFN + "_empty_dir.zip"
cls.encrypted_zip = TESTFN + "_encrypted.zip"

# compressed size is larger than the source contents
cls.small_data = "a"
# compressed size is smaller than the source contents
cls.large_data = "b" * 400

cls.small_file = "small.txt"
cls.large_file = "large.txt"
cls.emtpy_dir_name = "directory/"

# create zipfiles to compare against
# these zipfiles should never be written to outside of this method
with zipfile.ZipFile(cls.small_zip, "w") as destination:
cls.write_small_file(destination)

with zipfile.ZipFile(cls.large_zip, "w") as destination:
cls.write_large_file(destination)

with zipfile.ZipFile(cls.small_large_zip, "w") as destination:
cls.write_small_file(destination)
cls.write_large_file(destination)

with zipfile.ZipFile(cls.emtpy_dir_zip, "w") as destination:
# use mode other than default to check that mode is copied
destination.mkdir(cls.emtpy_dir_name, mode=123)

with open(cls.encrypted_zip, "wb") as destination:
destination.write(cls.encrypted_zip1_data)

def tearDown(cls):
if os.path.exists(TESTFN):
unlink(TESTFN)

@classmethod
def tearDownClass(cls):
unlink(cls.small_zip)
unlink(cls.large_zip)
unlink(cls.small_large_zip)
unlink(cls.emtpy_dir_zip)
unlink(cls.encrypted_zip)

def assertIdentical(self, file1, file2):
self.assertTrue(filecmp.cmp(file1, file2, shallow=False))

def _test_copy_files(self, source_zipfile, filenames=None):
with zipfile.ZipFile(TESTFN, 'w') as destination:
destination.copy_files(source_zipfile, filenames)
self.assertIdentical(TESTFN, source_zipfile)

# A compressed file can be larger than its uncompressed form,
# which are two different states we need to test

# Copying tests with one small file
def test_copy_one_small_file_via_iterable(self):
self._test_copy_files(self.small_zip, [self.small_file])

def test_copy_one_small_file_via_whole_zipfile(self):
self._test_copy_files(self.small_zip)

# Copying tests with one large file
def test_copy_one_large_file_via_iterable(self):
self._test_copy_files(self.large_zip, [self.large_file])

def test_copy_one_large_file_via_whole_zipfile(self):
self._test_copy_files(self.large_zip)

# Copying tests with empty directory
def test_copy_directory_via_iterable(self):
self._test_copy_files(self.emtpy_dir_zip, [self.emtpy_dir_name])

def test_copy_directory_via_whole_zipfile(self):
self._test_copy_files(self.emtpy_dir_zip)

# Copying tests with encrypted file
def test_copy_encrypted_file_via_iterable(self):
self._test_copy_files(self.encrypted_zip, [self.zip1_filename])

def test_copy_encrypted_file_via_whole_zipfile(self):
self._test_copy_files(self.encrypted_zip)

# Copying tests with nonempty destination zipfile
def _test_nonempty_zipfile(self, source_zipfile, filenames=None):
with zipfile.ZipFile(TESTFN, 'w') as destination:
self.write_small_file(destination)
destination.copy_files(source_zipfile, filenames)
self.assertIdentical(TESTFN, self.small_large_zip)

def test_copy_to_nonempty_zipfile_via_iterable(self):
self._test_nonempty_zipfile(self.large_zip, [self.large_file])

def test_copy_to_nonempty_zipfile_via_whole_zipfile(self):
self._test_nonempty_zipfile(self.large_zip)

# Copying tests with two files
def test_copy_two_files_via_iterable(self):
self._test_copy_files(self.small_large_zip,
[self.small_file, self.large_file])

def test_copy_two_files_via_whole_zipfile(self):
self._test_copy_files(self.small_large_zip)

class StoredCopyFileTests(AbstractCopyFileTests, unittest.TestCase):
compression = zipfile.ZIP_STORED

@classmethod
def setUpClass(cls):
super().setUpClass()
cls.exceptions_zip = TESTFN + "_exception"

with zipfile.ZipFile(cls.exceptions_zip,
"w") as destination:
cls.write_small_file(destination)

@classmethod
def tearDownClass(cls):
super().tearDownClass()
unlink(cls.exceptions_zip)

def copy_via_iterable(self, destination):
destination.copy_files(self.small_zip, [self.small_file])

def copy_entire_zipfile(self, destination):
destination.copy_files(self.small_zip)

def test_copy_methods_issue_exception_when_zipfile_not_open_for_write(self):
with zipfile.ZipFile(self.exceptions_zip, 'r') as destination:
self.assertRaises(ValueError, self.copy_via_iterable, destination)
self.assertRaises(ValueError, self.copy_entire_zipfile, destination)

def test_copy_file__issues_exception_when_already_writing(self):
with zipfile.ZipFile(self.exceptions_zip, 'w') as destination:
with destination.open('foo', mode='w') as open_file:
self.assertRaises(ValueError, self.copy_via_iterable,
destination)
self.assertRaises(ValueError, self.copy_entire_zipfile,
destination)

def test_copy_file__issues_exception_when_file_closed(self):
destination = zipfile.ZipFile(self.exceptions_zip, 'w')
destination.close()
self.assertRaises(ValueError, self.copy_via_iterable, destination)
self.assertRaises(ValueError, self.copy_entire_zipfile, destination)

@requires_zlib()
class DeflateCopyFileTests(AbstractCopyFileTests, unittest.TestCase):
compression = zipfile.ZIP_DEFLATED

@requires_bz2()
class Bzip2CopyFileTests(AbstractCopyFileTests, unittest.TestCase):
compression = zipfile.ZIP_BZIP2

@requires_lzma()
class LzmaCopyFileTests(AbstractCopyFileTests, unittest.TestCase):
compression = zipfile.ZIP_LZMA


class AbstractTestsWithRandomBinaryFiles:
@classmethod
def setUpClass(cls):
Expand Down
Loading
Loading