Skip to content

Commit f0bcad7

Browse files
hbinayouknowone
authored andcommitted
Added test_audit
This test is currently noop because RustPython does not have sys.audit? Signed-off-by: Hanif Ariffin <hanif.ariffin.4326@gmail.com>
1 parent 57a83db commit f0bcad7

File tree

1 file changed

+318
-0
lines changed

1 file changed

+318
-0
lines changed

Lib/test/test_audit.py

+318
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,318 @@
1+
"""Tests for sys.audit and sys.addaudithook
2+
"""
3+
4+
import subprocess
5+
import sys
6+
import unittest
7+
from test import support
8+
from test.support import import_helper
9+
from test.support import os_helper
10+
11+
12+
if not hasattr(sys, "addaudithook") or not hasattr(sys, "audit"):
13+
raise unittest.SkipTest("test only relevant when sys.audit is available")
14+
15+
AUDIT_TESTS_PY = support.findfile("audit-tests.py")
16+
17+
18+
class AuditTest(unittest.TestCase):
19+
maxDiff = None
20+
21+
@support.requires_subprocess()
22+
def run_test_in_subprocess(self, *args):
23+
with subprocess.Popen(
24+
[sys.executable, "-X utf8", AUDIT_TESTS_PY, *args],
25+
encoding="utf-8",
26+
stdout=subprocess.PIPE,
27+
stderr=subprocess.PIPE,
28+
) as p:
29+
p.wait()
30+
return p, p.stdout.read(), p.stderr.read()
31+
32+
def do_test(self, *args):
33+
proc, stdout, stderr = self.run_test_in_subprocess(*args)
34+
35+
sys.stdout.write(stdout)
36+
sys.stderr.write(stderr)
37+
if proc.returncode:
38+
self.fail(stderr)
39+
40+
def run_python(self, *args, expect_stderr=False):
41+
events = []
42+
proc, stdout, stderr = self.run_test_in_subprocess(*args)
43+
if not expect_stderr or support.verbose:
44+
sys.stderr.write(stderr)
45+
return (
46+
proc.returncode,
47+
[line.strip().partition(" ") for line in stdout.splitlines()],
48+
stderr,
49+
)
50+
51+
def test_basic(self):
52+
self.do_test("test_basic")
53+
54+
def test_block_add_hook(self):
55+
self.do_test("test_block_add_hook")
56+
57+
def test_block_add_hook_baseexception(self):
58+
self.do_test("test_block_add_hook_baseexception")
59+
60+
def test_marshal(self):
61+
import_helper.import_module("marshal")
62+
63+
self.do_test("test_marshal")
64+
65+
def test_pickle(self):
66+
import_helper.import_module("pickle")
67+
68+
self.do_test("test_pickle")
69+
70+
def test_monkeypatch(self):
71+
self.do_test("test_monkeypatch")
72+
73+
def test_open(self):
74+
self.do_test("test_open", os_helper.TESTFN)
75+
76+
def test_cantrace(self):
77+
self.do_test("test_cantrace")
78+
79+
def test_mmap(self):
80+
self.do_test("test_mmap")
81+
82+
def test_excepthook(self):
83+
returncode, events, stderr = self.run_python("test_excepthook")
84+
if not returncode:
85+
self.fail(f"Expected fatal exception\n{stderr}")
86+
87+
self.assertSequenceEqual(
88+
[("sys.excepthook", " ", "RuntimeError('fatal-error')")], events
89+
)
90+
91+
def test_unraisablehook(self):
92+
import_helper.import_module("_testcapi")
93+
returncode, events, stderr = self.run_python("test_unraisablehook")
94+
if returncode:
95+
self.fail(stderr)
96+
97+
self.assertEqual(events[0][0], "sys.unraisablehook")
98+
self.assertEqual(
99+
events[0][2],
100+
"RuntimeError('nonfatal-error') Exception ignored for audit hook test",
101+
)
102+
103+
def test_winreg(self):
104+
import_helper.import_module("winreg")
105+
returncode, events, stderr = self.run_python("test_winreg")
106+
if returncode:
107+
self.fail(stderr)
108+
109+
self.assertEqual(events[0][0], "winreg.OpenKey")
110+
self.assertEqual(events[1][0], "winreg.OpenKey/result")
111+
expected = events[1][2]
112+
self.assertTrue(expected)
113+
self.assertSequenceEqual(["winreg.EnumKey", " ", f"{expected} 0"], events[2])
114+
self.assertSequenceEqual(["winreg.EnumKey", " ", f"{expected} 10000"], events[3])
115+
self.assertSequenceEqual(["winreg.PyHKEY.Detach", " ", expected], events[4])
116+
117+
def test_socket(self):
118+
import_helper.import_module("socket")
119+
returncode, events, stderr = self.run_python("test_socket")
120+
if returncode:
121+
self.fail(stderr)
122+
123+
if support.verbose:
124+
print(*events, sep='\n')
125+
self.assertEqual(events[0][0], "socket.gethostname")
126+
self.assertEqual(events[1][0], "socket.__new__")
127+
self.assertEqual(events[2][0], "socket.bind")
128+
self.assertTrue(events[2][2].endswith("('127.0.0.1', 8080)"))
129+
130+
def test_gc(self):
131+
returncode, events, stderr = self.run_python("test_gc")
132+
if returncode:
133+
self.fail(stderr)
134+
135+
if support.verbose:
136+
print(*events, sep='\n')
137+
self.assertEqual(
138+
[event[0] for event in events],
139+
["gc.get_objects", "gc.get_referrers", "gc.get_referents"]
140+
)
141+
142+
143+
@support.requires_resource('network')
144+
def test_http(self):
145+
import_helper.import_module("http.client")
146+
returncode, events, stderr = self.run_python("test_http_client")
147+
if returncode:
148+
self.fail(stderr)
149+
150+
if support.verbose:
151+
print(*events, sep='\n')
152+
self.assertEqual(events[0][0], "http.client.connect")
153+
self.assertEqual(events[0][2], "www.python.org 80")
154+
self.assertEqual(events[1][0], "http.client.send")
155+
if events[1][2] != '[cannot send]':
156+
self.assertIn('HTTP', events[1][2])
157+
158+
159+
def test_sqlite3(self):
160+
sqlite3 = import_helper.import_module("sqlite3")
161+
returncode, events, stderr = self.run_python("test_sqlite3")
162+
if returncode:
163+
self.fail(stderr)
164+
165+
if support.verbose:
166+
print(*events, sep='\n')
167+
actual = [ev[0] for ev in events]
168+
expected = ["sqlite3.connect", "sqlite3.connect/handle"] * 2
169+
170+
if hasattr(sqlite3.Connection, "enable_load_extension"):
171+
expected += [
172+
"sqlite3.enable_load_extension",
173+
"sqlite3.load_extension",
174+
]
175+
self.assertEqual(actual, expected)
176+
177+
178+
def test_sys_getframe(self):
179+
returncode, events, stderr = self.run_python("test_sys_getframe")
180+
if returncode:
181+
self.fail(stderr)
182+
183+
if support.verbose:
184+
print(*events, sep='\n')
185+
actual = [(ev[0], ev[2]) for ev in events]
186+
expected = [("sys._getframe", "test_sys_getframe")]
187+
188+
self.assertEqual(actual, expected)
189+
190+
def test_sys_getframemodulename(self):
191+
returncode, events, stderr = self.run_python("test_sys_getframemodulename")
192+
if returncode:
193+
self.fail(stderr)
194+
195+
if support.verbose:
196+
print(*events, sep='\n')
197+
actual = [(ev[0], ev[2]) for ev in events]
198+
expected = [("sys._getframemodulename", "0")]
199+
200+
self.assertEqual(actual, expected)
201+
202+
203+
def test_threading(self):
204+
returncode, events, stderr = self.run_python("test_threading")
205+
if returncode:
206+
self.fail(stderr)
207+
208+
if support.verbose:
209+
print(*events, sep='\n')
210+
actual = [(ev[0], ev[2]) for ev in events]
211+
expected = [
212+
("_thread.start_new_thread", "(<test_func>, (), None)"),
213+
("test.test_func", "()"),
214+
("_thread.start_joinable_thread", "(<test_func>, 1, None)"),
215+
("test.test_func", "()"),
216+
]
217+
218+
self.assertEqual(actual, expected)
219+
220+
221+
def test_wmi_exec_query(self):
222+
import_helper.import_module("_wmi")
223+
returncode, events, stderr = self.run_python("test_wmi_exec_query")
224+
if returncode:
225+
self.fail(stderr)
226+
227+
if support.verbose:
228+
print(*events, sep='\n')
229+
actual = [(ev[0], ev[2]) for ev in events]
230+
expected = [("_wmi.exec_query", "SELECT * FROM Win32_OperatingSystem")]
231+
232+
self.assertEqual(actual, expected)
233+
234+
def test_syslog(self):
235+
syslog = import_helper.import_module("syslog")
236+
237+
returncode, events, stderr = self.run_python("test_syslog")
238+
if returncode:
239+
self.fail(stderr)
240+
241+
if support.verbose:
242+
print('Events:', *events, sep='\n ')
243+
244+
self.assertSequenceEqual(
245+
events,
246+
[('syslog.openlog', ' ', f'python 0 {syslog.LOG_USER}'),
247+
('syslog.syslog', ' ', f'{syslog.LOG_INFO} test'),
248+
('syslog.setlogmask', ' ', f'{syslog.LOG_DEBUG}'),
249+
('syslog.closelog', '', ''),
250+
('syslog.syslog', ' ', f'{syslog.LOG_INFO} test2'),
251+
('syslog.openlog', ' ', f'audit-tests.py 0 {syslog.LOG_USER}'),
252+
('syslog.openlog', ' ', f'audit-tests.py {syslog.LOG_NDELAY} {syslog.LOG_LOCAL0}'),
253+
('syslog.openlog', ' ', f'None 0 {syslog.LOG_USER}'),
254+
('syslog.closelog', '', '')]
255+
)
256+
257+
def test_not_in_gc(self):
258+
returncode, _, stderr = self.run_python("test_not_in_gc")
259+
if returncode:
260+
self.fail(stderr)
261+
262+
def test_time(self):
263+
returncode, events, stderr = self.run_python("test_time", "print")
264+
if returncode:
265+
self.fail(stderr)
266+
267+
if support.verbose:
268+
print(*events, sep='\n')
269+
270+
actual = [(ev[0], ev[2]) for ev in events]
271+
expected = [("time.sleep", "0"),
272+
("time.sleep", "0.0625"),
273+
("time.sleep", "-1")]
274+
275+
self.assertEqual(actual, expected)
276+
277+
def test_time_fail(self):
278+
returncode, events, stderr = self.run_python("test_time", "fail",
279+
expect_stderr=True)
280+
self.assertNotEqual(returncode, 0)
281+
self.assertIn('hook failed', stderr.splitlines()[-1])
282+
283+
def test_sys_monitoring_register_callback(self):
284+
returncode, events, stderr = self.run_python("test_sys_monitoring_register_callback")
285+
if returncode:
286+
self.fail(stderr)
287+
288+
if support.verbose:
289+
print(*events, sep='\n')
290+
actual = [(ev[0], ev[2]) for ev in events]
291+
expected = [("sys.monitoring.register_callback", "(None,)")]
292+
293+
self.assertEqual(actual, expected)
294+
295+
def test_winapi_createnamedpipe(self):
296+
winapi = import_helper.import_module("_winapi")
297+
298+
pipe_name = r"\\.\pipe\LOCAL\test_winapi_createnamed_pipe"
299+
returncode, events, stderr = self.run_python("test_winapi_createnamedpipe", pipe_name)
300+
if returncode:
301+
self.fail(stderr)
302+
303+
if support.verbose:
304+
print(*events, sep='\n')
305+
actual = [(ev[0], ev[2]) for ev in events]
306+
expected = [("_winapi.CreateNamedPipe", f"({pipe_name!r}, 3, 8)")]
307+
308+
self.assertEqual(actual, expected)
309+
310+
def test_assert_unicode(self):
311+
# See gh-126018
312+
returncode, _, stderr = self.run_python("test_assert_unicode")
313+
if returncode:
314+
self.fail(stderr)
315+
316+
317+
if __name__ == "__main__":
318+
unittest.main()

0 commit comments

Comments
 (0)