Skip to content

Commit f2d0560

Browse files
CPython Developersyouknowone
CPython Developers
authored andcommitted
Update zipfile from CPython 3.12.2
1 parent 696dcea commit f2d0560

18 files changed

+2327
-784
lines changed

Lib/test/archiver_tests.py

+155
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
"""Tests common to tarfile and zipfile."""
2+
3+
import os
4+
import sys
5+
6+
from test.support import os_helper
7+
8+
class OverwriteTests:
9+
10+
def setUp(self):
11+
os.makedirs(self.testdir)
12+
self.addCleanup(os_helper.rmtree, self.testdir)
13+
14+
def create_file(self, path, content=b''):
15+
with open(path, 'wb') as f:
16+
f.write(content)
17+
18+
def open(self, path):
19+
raise NotImplementedError
20+
21+
def extractall(self, ar):
22+
raise NotImplementedError
23+
24+
25+
def test_overwrite_file_as_file(self):
26+
target = os.path.join(self.testdir, 'test')
27+
self.create_file(target, b'content')
28+
with self.open(self.ar_with_file) as ar:
29+
self.extractall(ar)
30+
self.assertTrue(os.path.isfile(target))
31+
with open(target, 'rb') as f:
32+
self.assertEqual(f.read(), b'newcontent')
33+
34+
def test_overwrite_dir_as_dir(self):
35+
target = os.path.join(self.testdir, 'test')
36+
os.mkdir(target)
37+
with self.open(self.ar_with_dir) as ar:
38+
self.extractall(ar)
39+
self.assertTrue(os.path.isdir(target))
40+
41+
def test_overwrite_dir_as_implicit_dir(self):
42+
target = os.path.join(self.testdir, 'test')
43+
os.mkdir(target)
44+
with self.open(self.ar_with_implicit_dir) as ar:
45+
self.extractall(ar)
46+
self.assertTrue(os.path.isdir(target))
47+
self.assertTrue(os.path.isfile(os.path.join(target, 'file')))
48+
with open(os.path.join(target, 'file'), 'rb') as f:
49+
self.assertEqual(f.read(), b'newcontent')
50+
51+
def test_overwrite_dir_as_file(self):
52+
target = os.path.join(self.testdir, 'test')
53+
os.mkdir(target)
54+
with self.open(self.ar_with_file) as ar:
55+
with self.assertRaises(PermissionError if sys.platform == 'win32'
56+
else IsADirectoryError):
57+
self.extractall(ar)
58+
self.assertTrue(os.path.isdir(target))
59+
60+
def test_overwrite_file_as_dir(self):
61+
target = os.path.join(self.testdir, 'test')
62+
self.create_file(target, b'content')
63+
with self.open(self.ar_with_dir) as ar:
64+
with self.assertRaises(FileExistsError):
65+
self.extractall(ar)
66+
self.assertTrue(os.path.isfile(target))
67+
with open(target, 'rb') as f:
68+
self.assertEqual(f.read(), b'content')
69+
70+
def test_overwrite_file_as_implicit_dir(self):
71+
target = os.path.join(self.testdir, 'test')
72+
self.create_file(target, b'content')
73+
with self.open(self.ar_with_implicit_dir) as ar:
74+
with self.assertRaises(FileNotFoundError if sys.platform == 'win32'
75+
else NotADirectoryError):
76+
self.extractall(ar)
77+
self.assertTrue(os.path.isfile(target))
78+
with open(target, 'rb') as f:
79+
self.assertEqual(f.read(), b'content')
80+
81+
@os_helper.skip_unless_symlink
82+
def test_overwrite_file_symlink_as_file(self):
83+
# XXX: It is potential security vulnerability.
84+
target = os.path.join(self.testdir, 'test')
85+
target2 = os.path.join(self.testdir, 'test2')
86+
self.create_file(target2, b'content')
87+
os.symlink('test2', target)
88+
with self.open(self.ar_with_file) as ar:
89+
self.extractall(ar)
90+
self.assertTrue(os.path.islink(target))
91+
self.assertTrue(os.path.isfile(target2))
92+
with open(target2, 'rb') as f:
93+
self.assertEqual(f.read(), b'newcontent')
94+
95+
@os_helper.skip_unless_symlink
96+
def test_overwrite_broken_file_symlink_as_file(self):
97+
# XXX: It is potential security vulnerability.
98+
target = os.path.join(self.testdir, 'test')
99+
target2 = os.path.join(self.testdir, 'test2')
100+
os.symlink('test2', target)
101+
with self.open(self.ar_with_file) as ar:
102+
self.extractall(ar)
103+
self.assertTrue(os.path.islink(target))
104+
self.assertTrue(os.path.isfile(target2))
105+
with open(target2, 'rb') as f:
106+
self.assertEqual(f.read(), b'newcontent')
107+
108+
@os_helper.skip_unless_symlink
109+
def test_overwrite_dir_symlink_as_dir(self):
110+
# XXX: It is potential security vulnerability.
111+
target = os.path.join(self.testdir, 'test')
112+
target2 = os.path.join(self.testdir, 'test2')
113+
os.mkdir(target2)
114+
os.symlink('test2', target, target_is_directory=True)
115+
with self.open(self.ar_with_dir) as ar:
116+
self.extractall(ar)
117+
self.assertTrue(os.path.islink(target))
118+
self.assertTrue(os.path.isdir(target2))
119+
120+
@os_helper.skip_unless_symlink
121+
def test_overwrite_dir_symlink_as_implicit_dir(self):
122+
# XXX: It is potential security vulnerability.
123+
target = os.path.join(self.testdir, 'test')
124+
target2 = os.path.join(self.testdir, 'test2')
125+
os.mkdir(target2)
126+
os.symlink('test2', target, target_is_directory=True)
127+
with self.open(self.ar_with_implicit_dir) as ar:
128+
self.extractall(ar)
129+
self.assertTrue(os.path.islink(target))
130+
self.assertTrue(os.path.isdir(target2))
131+
self.assertTrue(os.path.isfile(os.path.join(target2, 'file')))
132+
with open(os.path.join(target2, 'file'), 'rb') as f:
133+
self.assertEqual(f.read(), b'newcontent')
134+
135+
@os_helper.skip_unless_symlink
136+
def test_overwrite_broken_dir_symlink_as_dir(self):
137+
target = os.path.join(self.testdir, 'test')
138+
target2 = os.path.join(self.testdir, 'test2')
139+
os.symlink('test2', target, target_is_directory=True)
140+
with self.open(self.ar_with_dir) as ar:
141+
with self.assertRaises(FileExistsError):
142+
self.extractall(ar)
143+
self.assertTrue(os.path.islink(target))
144+
self.assertFalse(os.path.exists(target2))
145+
146+
@os_helper.skip_unless_symlink
147+
def test_overwrite_broken_dir_symlink_as_implicit_dir(self):
148+
target = os.path.join(self.testdir, 'test')
149+
target2 = os.path.join(self.testdir, 'test2')
150+
os.symlink('test2', target, target_is_directory=True)
151+
with self.open(self.ar_with_implicit_dir) as ar:
152+
with self.assertRaises(FileExistsError):
153+
self.extractall(ar)
154+
self.assertTrue(os.path.islink(target))
155+
self.assertFalse(os.path.exists(target2))

Lib/test/test_zipfile/__init__.py

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
import os
2+
from test.support import load_package_tests
3+
4+
def load_tests(*args):
5+
return load_package_tests(os.path.dirname(__file__), *args)

Lib/test/test_zipfile/__main__.py

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
import unittest
2+
3+
from . import load_tests # noqa: F401
4+
5+
6+
if __name__ == "__main__":
7+
unittest.main()

Lib/test/test_zipfile/_path/__init__.py

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import functools
2+
3+
4+
# from jaraco.functools 3.5.2
5+
def compose(*funcs):
6+
def compose_two(f1, f2):
7+
return lambda *args, **kwargs: f1(f2(*args, **kwargs))
8+
9+
return functools.reduce(compose_two, funcs)
+79
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
import itertools
2+
from collections import deque
3+
from itertools import islice
4+
5+
6+
# from jaraco.itertools 6.3.0
7+
class Counter:
8+
"""
9+
Wrap an iterable in an object that stores the count of items
10+
that pass through it.
11+
12+
>>> items = Counter(range(20))
13+
>>> items.count
14+
0
15+
>>> values = list(items)
16+
>>> items.count
17+
20
18+
"""
19+
20+
def __init__(self, i):
21+
self.count = 0
22+
self.iter = zip(itertools.count(1), i)
23+
24+
def __iter__(self):
25+
return self
26+
27+
def __next__(self):
28+
self.count, result = next(self.iter)
29+
return result
30+
31+
32+
# from more_itertools v8.13.0
33+
def always_iterable(obj, base_type=(str, bytes)):
34+
if obj is None:
35+
return iter(())
36+
37+
if (base_type is not None) and isinstance(obj, base_type):
38+
return iter((obj,))
39+
40+
try:
41+
return iter(obj)
42+
except TypeError:
43+
return iter((obj,))
44+
45+
46+
# from more_itertools v9.0.0
47+
def consume(iterator, n=None):
48+
"""Advance *iterable* by *n* steps. If *n* is ``None``, consume it
49+
entirely.
50+
Efficiently exhausts an iterator without returning values. Defaults to
51+
consuming the whole iterator, but an optional second argument may be
52+
provided to limit consumption.
53+
>>> i = (x for x in range(10))
54+
>>> next(i)
55+
0
56+
>>> consume(i, 3)
57+
>>> next(i)
58+
4
59+
>>> consume(i)
60+
>>> next(i)
61+
Traceback (most recent call last):
62+
File "<stdin>", line 1, in <module>
63+
StopIteration
64+
If the iterator has fewer items remaining than the provided limit, the
65+
whole iterator will be consumed.
66+
>>> i = (x for x in range(3))
67+
>>> consume(i, 5)
68+
>>> next(i)
69+
Traceback (most recent call last):
70+
File "<stdin>", line 1, in <module>
71+
StopIteration
72+
"""
73+
# Use functions that consume iterators at C speed.
74+
if n is None:
75+
# feed the entire iterator into a zero-length deque
76+
deque(iterator, maxlen=0)
77+
else:
78+
# advance to the empty slice starting at position n
79+
next(islice(iterator, n, n), None)
+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import importlib
2+
import unittest
3+
4+
5+
def import_or_skip(name):
6+
try:
7+
return importlib.import_module(name)
8+
except ImportError: # pragma: no cover
9+
raise unittest.SkipTest(f'Unable to import {name}')
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import types
2+
import functools
3+
4+
from ._itertools import always_iterable
5+
6+
7+
def parameterize(names, value_groups):
8+
"""
9+
Decorate a test method to run it as a set of subtests.
10+
11+
Modeled after pytest.parametrize.
12+
"""
13+
14+
def decorator(func):
15+
@functools.wraps(func)
16+
def wrapped(self):
17+
for values in value_groups:
18+
resolved = map(Invoked.eval, always_iterable(values))
19+
params = dict(zip(always_iterable(names), resolved))
20+
with self.subTest(**params):
21+
func(self, **params)
22+
23+
return wrapped
24+
25+
return decorator
26+
27+
28+
class Invoked(types.SimpleNamespace):
29+
"""
30+
Wrap a function to be invoked for each usage.
31+
"""
32+
33+
@classmethod
34+
def wrap(cls, func):
35+
return cls(func=func)
36+
37+
@classmethod
38+
def eval(cls, cand):
39+
return cand.func() if isinstance(cand, cls) else cand

0 commit comments

Comments
 (0)