Skip to content

gh-51067: add ZipInfo.remove() #103033

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions Doc/library/zipfile.rst
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,23 @@ ZipFile Objects
.. versionadded:: 3.11


.. method:: ZipFile.remove(zinfo_or_arcname)

Removes a member from the archive. *zinfo_or_arcname* is either the full
path of the member, or a :class:`ZipInfo` instance.

The archive must be opened with mode ``'w'``, ``'x'`` or ``'a'``.

Calling :meth:`remove` on a closed ZipFile will raise a :exc:`ValueError`.

.. note::

Removing a member in an archive may involve a move of many internal data
records, which can be I/O intensive for a large ZIP file.

.. versionadded:: 3.12
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.. versionadded:: 3.12
.. versionadded:: next



The following data attributes are also available:

.. attribute:: ZipFile.filename
Expand Down
233 changes: 233 additions & 0 deletions Lib/test/test_zipfile/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1147,6 +1147,239 @@ class LzmaWriterTests(AbstractWriterTests, unittest.TestCase):
compression = zipfile.ZIP_LZMA


class AbstractRemoveTests:

def _test_removing_indexes(self, test_files, indexes):
"""Test underlying _remove_members() for removing members at given
indexes."""
# calculate the expected results
expected_files = []
with zipfile.ZipFile(TESTFN, 'w') as zh:
for i, (file, data) in enumerate(test_files):
if i not in indexes:
zh.writestr(file, data)
expected_files.append(file)
expected_size = os.path.getsize(TESTFN)

# prepare the test zip
with zipfile.ZipFile(TESTFN, 'w') as zh:
for file, data in test_files:
zh.writestr(file, data)

# do the removal and check the result
with zipfile.ZipFile(TESTFN, 'a') as zh:
members = {zh.infolist()[i] for i in indexes}
zh._remove_members(members)

# make sure internal caches have reflected the change
# and are consistent
self.assertEqual(zh.namelist(), expected_files)
for file, _ in test_files:
if file in zh.namelist():
self.assertEqual(zh.getinfo(file).filename, file)
else:
with self.assertRaises(KeyError):
zh.getinfo(file)

self.assertIsNone(zh.testzip())
self.assertEqual(os.path.getsize(TESTFN), expected_size)

def _test_removing_combinations(self, test_files, n=None):
"""Test underlying _remove_members() for removing random combinations
of members."""
ln = len(test_files)
if n is None:
# iterate n from 1 to all
for n in range(1, ln + 1):
for indexes in itertools.combinations(range(ln), n):
with self.subTest(remove=indexes):
self._test_removing_indexes(test_files, indexes)
else:
for indexes in itertools.combinations(range(ln), n):
with self.subTest(remove=indexes):
self._test_removing_indexes(test_files, indexes)

def test_basic(self):
# Test underlying _remove_members() for removing random combinations of members.
test_files = [
('file0.txt', b'Lorem ipsum dolor sit amet, consectetur adipiscing elit'),
('file1.txt', b'Duis aute irure dolor in reprehenderit in voluptate velit esse'),
('file2.txt', b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'),
]

self._test_removing_combinations(test_files)

def test_duplicated_arcname(self):
# Test underlying _remove_members() for removing any one of random duplicated members.
dupl_file = 'file.txt'
test_files = [
('file0.txt', b'Lorem ipsum dolor sit amet, consectetur adipiscing elit'),
('file1.txt', b'Duis aute irure dolor in reprehenderit in voluptate velit esse'),
('file2.txt', b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'),
]

ln = len(test_files)
for n in range(2, ln + 1):
for dups in itertools.combinations(range(ln), n):
files = []
for i, (file, data) in enumerate(test_files):
file_ = dupl_file if i in dups else file
files.append((file_, data))

for index in dups:
indexes = [index]
with self.subTest(dups=dups, indexes=indexes):
self._test_removing_indexes(files, indexes)

def test_non_physical(self):
# Test underlying _remove_members() for non-physical removing.
test_files = [
('file0.txt', b'Lorem ipsum dolor sit amet, consectetur adipiscing elit'),
('file1.txt', b'Duis aute irure dolor in reprehenderit in voluptate velit esse'),
('file2.txt', b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'),
]

ln = len(test_files)
for n in range(1, ln + 1):
for indexes in itertools.combinations(range(ln), n):
with self.subTest(remove=indexes):
# prepare the test zip
expected = {}
with zipfile.ZipFile(TESTFN, 'w') as zh:
for i, (file, data) in enumerate(test_files):
zh.writestr(file, data)
if i not in indexes:
expected[file] = zh.getinfo(file).header_offset

# do the removal and check the result
with zipfile.ZipFile(TESTFN, 'a') as zh:
members = {zh.infolist()[i] for i in indexes}
zh._remove_members(members, remove_physical=False)
self.assertEqual(zh.namelist(), list(expected))
for file, offset in expected.items():
self.assertEqual(zh.getinfo(file).header_offset, offset)
self.assertIsNone(zh.testzip())

def test_verify(self):
# Test if params are passed to underlying _remove_members() correctly,
# or never passed if conditions not met.
file0 = 'file0.txt'
file = 'datafile.txt'
data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'

# closed: error and do nothing
with zipfile.ZipFile(TESTFN, 'w') as zh:
zh.writestr(file, data)
with zipfile.ZipFile(TESTFN, 'a') as zh:
zh.close()
with mock.patch('zipfile.ZipFile._remove_members') as mock_fn:
with self.assertRaises(ValueError):
zh.remove(file)
mock_fn.assert_not_called()

# writing: error and do nothing
with zipfile.ZipFile(TESTFN, 'w') as zh:
zh.writestr(file, data)
with zipfile.ZipFile(TESTFN, 'a') as zh:
with mock.patch('zipfile.ZipFile._remove_members') as mock_fn:
with zh.open(file0, 'w') as fh:
with self.assertRaises(ValueError):
zh.remove(file)
mock_fn.assert_not_called()

# mode 'r': error and do nothing
with zipfile.ZipFile(TESTFN, 'r') as zh:
with mock.patch('zipfile.ZipFile._remove_members') as mock_fn:
with self.assertRaises(ValueError):
zh.remove(file)
mock_fn.assert_not_called()

# mode 'a': the most general use case
with zipfile.ZipFile(TESTFN, 'w') as zh:
zh.writestr(file, data)
# -- remove with arcname
with zipfile.ZipFile(TESTFN, 'a') as zh:
with mock.patch('zipfile.ZipFile._remove_members') as mock_fn:
zh.remove(file)
mock_fn.assert_called_once_with({zh.getinfo(file)})
# -- remove with zinfo
with zipfile.ZipFile(TESTFN, 'a') as zh:
with mock.patch('zipfile.ZipFile._remove_members') as mock_fn:
zinfo = zh.getinfo(file)
zh.remove(zinfo)
mock_fn.assert_called_once_with({zinfo})
# -- remove with nonexist arcname
with zipfile.ZipFile(TESTFN, 'a') as zh:
with mock.patch('zipfile.ZipFile._remove_members') as mock_fn:
with self.assertRaises(KeyError):
zh.remove('nonexist.file')
mock_fn.assert_not_called()
# -- remove with nonexist zinfo (even if same name)
with zipfile.ZipFile(TESTFN, 'a') as zh:
with mock.patch('zipfile.ZipFile._remove_members') as mock_fn:
zinfo = zipfile.ZipInfo(file)
with self.assertRaises(KeyError):
zh.remove(zinfo)
mock_fn.assert_not_called()

# mode 'w': like 'a'; allows removing a just written member
with zipfile.ZipFile(TESTFN, 'w') as zh:
zh.writestr(file, data)
with mock.patch('zipfile.ZipFile._remove_members') as mock_fn:
zh.remove(file)
mock_fn.assert_called_once_with({zh.getinfo(file)})

# mode 'x': like 'w'
os.remove(TESTFN)
with zipfile.ZipFile(TESTFN, 'x') as zh:
zh.writestr(file, data)
with mock.patch('zipfile.ZipFile._remove_members') as mock_fn:
zh.remove(file)
mock_fn.assert_called_once_with({zh.getinfo(file)})

def test_zip64(self):
# Test if members use zip64.
file = 'datafile.txt'
file1 = 'pre.txt'
file2 = 'post.txt'
data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'
data1 = b'Lorem ipsum dolor sit amet, consectetur adipiscing elit'
data2 = b'Duis aute irure dolor in reprehenderit in voluptate velit esse'
with zipfile.ZipFile(TESTFN, 'w') as zh:
with zh.open(file1, 'w', force_zip64=True) as fh:
fh.write(data1)
with zh.open(file2, 'w', force_zip64=True) as fh:
fh.write(data2)
expected_size = os.path.getsize(TESTFN)

with zipfile.ZipFile(TESTFN, 'w') as zh:
with zh.open(file1, 'w', force_zip64=True) as fh:
fh.write(data1)
with zh.open(file, 'w', force_zip64=True) as fh:
fh.write(data)
with zh.open(file2, 'w', force_zip64=True) as fh:
fh.write(data2)
with zipfile.ZipFile(TESTFN, 'a') as zh:
zh.remove(file)
self.assertIsNone(zh.testzip())
self.assertEqual(os.path.getsize(TESTFN), expected_size)

class StoredRemoveTests(AbstractRemoveTests, unittest.TestCase):
compression = zipfile.ZIP_STORED

@requires_zlib()
class DeflateRemoveTests(AbstractRemoveTests, unittest.TestCase):
compression = zipfile.ZIP_DEFLATED

@requires_bz2()
class Bzip2RemoveTests(AbstractRemoveTests, unittest.TestCase):
compression = zipfile.ZIP_BZIP2

@requires_lzma()
class LzmaRemoveTests(AbstractRemoveTests, unittest.TestCase):
compression = zipfile.ZIP_LZMA


class PyZipFileTests(unittest.TestCase):
def assertCompiledIn(self, name, namelist):
if name + 'o' not in namelist:
Expand Down
63 changes: 63 additions & 0 deletions Lib/test/test_zipfile64.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,69 @@ def tearDown(self):
os_helper.unlink(TESTFN2)


class TestRemove(unittest.TestCase):
def setUp(self):
# Create test data.
line_gen = ("Test of zipfile line %d." % i for i in range(1000000))
self.data = '\n'.join(line_gen).encode('ascii')

def _write_large_file(self, fh):
# It will contain enough copies of self.data to reach about 8 GiB.
filecount = 8*1024**3 // len(self.data)

next_time = time.monotonic() + _PRINT_WORKING_MSG_INTERVAL
for num in range(filecount):
fh.write(self.data)
# Print still working message since this test can be really slow
if next_time <= time.monotonic():
next_time = time.monotonic() + _PRINT_WORKING_MSG_INTERVAL
print((
' writing %d of %d, be patient...' %
(num, filecount)), file=sys.__stdout__)
sys.__stdout__.flush()

def test_remove_large_file(self):
# Try the temp file. If we do TESTFN2, then it hogs
# gigabytes of disk space for the duration of the test.
with TemporaryFile() as f:
self._test_remove_large_file(f)
self.assertFalse(f.closed)

def _test_remove_large_file(self, f):
file = 'datafile.txt'
file1 = 'dummy.txt'
data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'
with zipfile.ZipFile(f, 'w') as zh:
with zh.open(file1, 'w', force_zip64=True) as fh:
self._write_large_file(fh)
zh.writestr(file, data)

with zipfile.ZipFile(f, 'a') as zh:
zh.remove(file1)
self.assertIsNone(zh.testzip())

def test_remove_before_large_file(self):
# Try the temp file. If we do TESTFN2, then it hogs
# gigabytes of disk space for the duration of the test.
with TemporaryFile() as f:
self._test_remove_before_large_file(f)
self.assertFalse(f.closed)

def _test_remove_before_large_file(self, f):
file = 'datafile.txt'
file1 = 'dummy.txt'
data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'
with zipfile.ZipFile(f, 'w') as zh:
zh.writestr(file, data)
with zh.open(file1, 'w', force_zip64=True) as fh:
self._write_large_file(fh)
expected_size = zh.getinfo(file1).file_size

with zipfile.ZipFile(f, 'a') as zh:
zh.remove(file)
self.assertIsNone(zh.testzip())


class OtherTests(unittest.TestCase):
def testMoreThan64kFiles(self):
# This test checks that more than 64k files can be added to an archive,
Expand Down
Loading
Loading