Skip to content

Commit 707fbcf

Browse files
authored
Merge pull request #4812 from Masorubka1/test_dtrace
Update test_dtrace.py from Cpython v3.11.2
2 parents fea3cc2 + b7ab716 commit 707fbcf

File tree

1 file changed

+178
-0
lines changed

1 file changed

+178
-0
lines changed

Lib/test/test_dtrace.py

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
import dis
2+
import os.path
3+
import re
4+
import subprocess
5+
import sys
6+
import types
7+
import unittest
8+
9+
from test import support
10+
from test.support import findfile
11+
12+
13+
if not support.has_subprocess_support:
14+
raise unittest.SkipTest("test module requires subprocess")
15+
16+
17+
def abspath(filename):
18+
return os.path.abspath(findfile(filename, subdir="dtracedata"))
19+
20+
21+
def normalize_trace_output(output):
22+
"""Normalize DTrace output for comparison.
23+
24+
DTrace keeps a per-CPU buffer, and when showing the fired probes, buffers
25+
are concatenated. So if the operating system moves our thread around, the
26+
straight result can be "non-causal". So we add timestamps to the probe
27+
firing, sort by that field, then strip it from the output"""
28+
29+
# When compiling with '--with-pydebug', strip '[# refs]' debug output.
30+
output = re.sub(r"\[[0-9]+ refs\]", "", output)
31+
try:
32+
result = [
33+
row.split("\t")
34+
for row in output.splitlines()
35+
if row and not row.startswith('#')
36+
]
37+
result.sort(key=lambda row: int(row[0]))
38+
result = [row[1] for row in result]
39+
return "\n".join(result)
40+
except (IndexError, ValueError):
41+
raise AssertionError(
42+
"tracer produced unparsable output:\n{}".format(output)
43+
)
44+
45+
46+
class TraceBackend:
47+
EXTENSION = None
48+
COMMAND = None
49+
COMMAND_ARGS = []
50+
51+
def run_case(self, name, optimize_python=None):
52+
actual_output = normalize_trace_output(self.trace_python(
53+
script_file=abspath(name + self.EXTENSION),
54+
python_file=abspath(name + ".py"),
55+
optimize_python=optimize_python))
56+
57+
with open(abspath(name + self.EXTENSION + ".expected")) as f:
58+
expected_output = f.read().rstrip()
59+
60+
return (expected_output, actual_output)
61+
62+
def generate_trace_command(self, script_file, subcommand=None):
63+
command = self.COMMAND + [script_file]
64+
if subcommand:
65+
command += ["-c", subcommand]
66+
return command
67+
68+
def trace(self, script_file, subcommand=None):
69+
command = self.generate_trace_command(script_file, subcommand)
70+
stdout, _ = subprocess.Popen(command,
71+
stdout=subprocess.PIPE,
72+
stderr=subprocess.STDOUT,
73+
universal_newlines=True).communicate()
74+
return stdout
75+
76+
def trace_python(self, script_file, python_file, optimize_python=None):
77+
python_flags = []
78+
if optimize_python:
79+
python_flags.extend(["-O"] * optimize_python)
80+
subcommand = " ".join([sys.executable] + python_flags + [python_file])
81+
return self.trace(script_file, subcommand)
82+
83+
def assert_usable(self):
84+
try:
85+
output = self.trace(abspath("assert_usable" + self.EXTENSION))
86+
output = output.strip()
87+
except (FileNotFoundError, NotADirectoryError, PermissionError) as fnfe:
88+
output = str(fnfe)
89+
if output != "probe: success":
90+
raise unittest.SkipTest(
91+
"{}(1) failed: {}".format(self.COMMAND[0], output)
92+
)
93+
94+
95+
class DTraceBackend(TraceBackend):
96+
EXTENSION = ".d"
97+
COMMAND = ["dtrace", "-q", "-s"]
98+
99+
100+
class SystemTapBackend(TraceBackend):
101+
EXTENSION = ".stp"
102+
COMMAND = ["stap", "-g"]
103+
104+
105+
class TraceTests:
106+
# unittest.TestCase options
107+
maxDiff = None
108+
109+
# TraceTests options
110+
backend = None
111+
optimize_python = 0
112+
113+
@classmethod
114+
def setUpClass(self):
115+
self.backend.assert_usable()
116+
117+
def run_case(self, name):
118+
actual_output, expected_output = self.backend.run_case(
119+
name, optimize_python=self.optimize_python)
120+
self.assertEqual(actual_output, expected_output)
121+
122+
def test_function_entry_return(self):
123+
self.run_case("call_stack")
124+
125+
def test_verify_call_opcodes(self):
126+
"""Ensure our call stack test hits all function call opcodes"""
127+
128+
opcodes = set(["CALL_FUNCTION", "CALL_FUNCTION_EX", "CALL_FUNCTION_KW"])
129+
130+
with open(abspath("call_stack.py")) as f:
131+
code_string = f.read()
132+
133+
def get_function_instructions(funcname):
134+
# Recompile with appropriate optimization setting
135+
code = compile(source=code_string,
136+
filename="<string>",
137+
mode="exec",
138+
optimize=self.optimize_python)
139+
140+
for c in code.co_consts:
141+
if isinstance(c, types.CodeType) and c.co_name == funcname:
142+
return dis.get_instructions(c)
143+
return []
144+
145+
for instruction in get_function_instructions('start'):
146+
opcodes.discard(instruction.opname)
147+
148+
self.assertEqual(set(), opcodes)
149+
150+
def test_gc(self):
151+
self.run_case("gc")
152+
153+
def test_line(self):
154+
self.run_case("line")
155+
156+
157+
class DTraceNormalTests(TraceTests, unittest.TestCase):
158+
backend = DTraceBackend()
159+
optimize_python = 0
160+
161+
162+
class DTraceOptimizedTests(TraceTests, unittest.TestCase):
163+
backend = DTraceBackend()
164+
optimize_python = 2
165+
166+
167+
class SystemTapNormalTests(TraceTests, unittest.TestCase):
168+
backend = SystemTapBackend()
169+
optimize_python = 0
170+
171+
172+
class SystemTapOptimizedTests(TraceTests, unittest.TestCase):
173+
backend = SystemTapBackend()
174+
optimize_python = 2
175+
176+
177+
if __name__ == '__main__':
178+
unittest.main()

0 commit comments

Comments
 (0)