Skip to content

Regrtest upgrade #5752

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Lib/test/__main__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from test.libregrtest import main
from test.libregrtest.main import main
main()
5 changes: 0 additions & 5 deletions Lib/test/libregrtest/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +0,0 @@
# We import importlib *ASAP* in order to test #15386
import importlib

from test.libregrtest.cmdline import _parse_args, RESOURCE_NAMES, ALL_RESOURCES
from test.libregrtest.main import main
261 changes: 205 additions & 56 deletions Lib/test/libregrtest/cmdline.py

Large diffs are not rendered by default.

77 changes: 77 additions & 0 deletions Lib/test/libregrtest/filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import itertools
import operator
import re


# By default, don't filter tests
_test_matchers = ()
_test_patterns = ()


def match_test(test):
# Function used by support.run_unittest() and regrtest --list-cases
result = False
for matcher, result in reversed(_test_matchers):
if matcher(test.id()):
return result
return not result


def _is_full_match_test(pattern):
# If a pattern contains at least one dot, it's considered
# as a full test identifier.
# Example: 'test.test_os.FileTests.test_access'.
#
# ignore patterns which contain fnmatch patterns: '*', '?', '[...]'
# or '[!...]'. For example, ignore 'test_access*'.
return ('.' in pattern) and (not re.search(r'[?*\[\]]', pattern))


def get_match_tests():
global _test_patterns
return _test_patterns


def set_match_tests(patterns):
global _test_matchers, _test_patterns

if not patterns:
_test_matchers = ()
_test_patterns = ()
else:
itemgetter = operator.itemgetter
patterns = tuple(patterns)
if patterns != _test_patterns:
_test_matchers = [
(_compile_match_function(map(itemgetter(0), it)), result)
for result, it in itertools.groupby(patterns, itemgetter(1))
]
_test_patterns = patterns


def _compile_match_function(patterns):
patterns = list(patterns)

if all(map(_is_full_match_test, patterns)):
# Simple case: all patterns are full test identifier.
# The test.bisect_cmd utility only uses such full test identifiers.
return set(patterns).__contains__
else:
import fnmatch
regex = '|'.join(map(fnmatch.translate, patterns))
# The search *is* case sensitive on purpose:
# don't use flags=re.IGNORECASE
regex_match = re.compile(regex).match

def match_test_regex(test_id, regex_match=regex_match):
if regex_match(test_id):
# The regex matches the whole identifier, for example
# 'test.test_os.FileTests.test_access'.
return True
else:
# Try to match parts of the test identifier.
# For example, split 'test.test_os.FileTests.test_access'
# into: 'test', 'test_os', 'FileTests' and 'test_access'.
return any(map(regex_match, test_id.split(".")))

return match_test_regex
110 changes: 110 additions & 0 deletions Lib/test/libregrtest/findtests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import os
import sys
import unittest
from collections.abc import Container

from test import support

from .filter import match_test, set_match_tests
from .utils import (
StrPath, TestName, TestTuple, TestList, TestFilter,
abs_module_name, count, printlist)


# If these test directories are encountered recurse into them and treat each
# "test_*.py" file or each sub-directory as a separate test module. This can
# increase parallelism.
#
# Beware this can't generally be done for any directory with sub-tests as the
# __init__.py may do things which alter what tests are to be run.
SPLITTESTDIRS: set[TestName] = {
"test_asyncio",
"test_concurrent_futures",
"test_doctests",
"test_future_stmt",
"test_gdb",
"test_inspect",
"test_pydoc",
"test_multiprocessing_fork",
"test_multiprocessing_forkserver",
"test_multiprocessing_spawn",
}


def findtestdir(path: StrPath | None = None) -> StrPath:
return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir


def findtests(*, testdir: StrPath | None = None, exclude: Container[str] = (),
split_test_dirs: set[TestName] = SPLITTESTDIRS,
base_mod: str = "") -> TestList:
"""Return a list of all applicable test modules."""
testdir = findtestdir(testdir)
tests = []
for name in os.listdir(testdir):
mod, ext = os.path.splitext(name)
if (not mod.startswith("test_")) or (mod in exclude):
continue
if base_mod:
fullname = f"{base_mod}.{mod}"
else:
fullname = mod
if fullname in split_test_dirs:
subdir = os.path.join(testdir, mod)
if not base_mod:
fullname = f"test.{mod}"
tests.extend(findtests(testdir=subdir, exclude=exclude,
split_test_dirs=split_test_dirs,
base_mod=fullname))
elif ext in (".py", ""):
tests.append(fullname)
return sorted(tests)


def split_test_packages(tests, *, testdir: StrPath | None = None,
exclude: Container[str] = (),
split_test_dirs=SPLITTESTDIRS) -> list[TestName]:
testdir = findtestdir(testdir)
splitted = []
for name in tests:
if name in split_test_dirs:
subdir = os.path.join(testdir, name)
splitted.extend(findtests(testdir=subdir, exclude=exclude,
split_test_dirs=split_test_dirs,
base_mod=name))
else:
splitted.append(name)
return splitted


def _list_cases(suite: unittest.TestSuite) -> None:
for test in suite:
if isinstance(test, unittest.loader._FailedTest): # type: ignore[attr-defined]
continue
if isinstance(test, unittest.TestSuite):
_list_cases(test)
elif isinstance(test, unittest.TestCase):
if match_test(test):
print(test.id())

def list_cases(tests: TestTuple, *,
match_tests: TestFilter | None = None,
test_dir: StrPath | None = None) -> None:
support.verbose = False
set_match_tests(match_tests)

skipped = []
for test_name in tests:
module_name = abs_module_name(test_name, test_dir)
try:
suite = unittest.defaultTestLoader.loadTestsFromName(module_name)
_list_cases(suite)
except unittest.SkipTest:
skipped.append(test_name)

if skipped:
sys.stdout.flush()
stderr = sys.stderr
print(file=stderr)
print(count(len(skipped), "test"), "skipped:", file=stderr)
printlist(skipped, file=stderr)
89 changes: 89 additions & 0 deletions Lib/test/libregrtest/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import os
import time

from test.support import MS_WINDOWS
from .results import TestResults
from .runtests import RunTests
from .utils import print_warning

if MS_WINDOWS:
from .win_utils import WindowsLoadTracker


class Logger:
def __init__(self, results: TestResults, quiet: bool, pgo: bool):
self.start_time = time.perf_counter()
self.test_count_text = ''
self.test_count_width = 3
self.win_load_tracker: WindowsLoadTracker | None = None
self._results: TestResults = results
self._quiet: bool = quiet
self._pgo: bool = pgo

def log(self, line: str = '') -> None:
empty = not line

# add the system load prefix: "load avg: 1.80 "
load_avg = self.get_load_avg()
if load_avg is not None:
line = f"load avg: {load_avg:.2f} {line}"

# add the timestamp prefix: "0:01:05 "
log_time = time.perf_counter() - self.start_time

mins, secs = divmod(int(log_time), 60)
hours, mins = divmod(mins, 60)
formatted_log_time = "%d:%02d:%02d" % (hours, mins, secs)

line = f"{formatted_log_time} {line}"
if empty:
line = line[:-1]

print(line, flush=True)

def get_load_avg(self) -> float | None:
if hasattr(os, 'getloadavg'):
try:
return os.getloadavg()[0]
except OSError:
pass
if self.win_load_tracker is not None:
return self.win_load_tracker.getloadavg()
return None

def display_progress(self, test_index: int, text: str) -> None:
if self._quiet:
return
results = self._results

# "[ 51/405/1] test_tcl passed"
line = f"{test_index:{self.test_count_width}}{self.test_count_text}"
fails = len(results.bad) + len(results.env_changed)
if fails and not self._pgo:
line = f"{line}/{fails}"
self.log(f"[{line}] {text}")

def set_tests(self, runtests: RunTests) -> None:
if runtests.forever:
self.test_count_text = ''
self.test_count_width = 3
else:
self.test_count_text = '/{}'.format(len(runtests.tests))
self.test_count_width = len(self.test_count_text) - 1

def start_load_tracker(self) -> None:
if not MS_WINDOWS:
return

try:
self.win_load_tracker = WindowsLoadTracker()
except PermissionError as error:
# Standard accounts may not have access to the performance
# counters.
print_warning(f'Failed to create WindowsLoadTracker: {error}')

def stop_load_tracker(self) -> None:
if self.win_load_tracker is None:
return
self.win_load_tracker.close()
self.win_load_tracker = None
Loading
Loading