From ba07bc5211e76b9b372ef57e3d255e6918150416 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Sun, 10 Sep 2023 00:23:00 +0200 Subject: [PATCH] gh-109162: Refactor libregrtest.runtest_mp * Add attributes to Regrtest and RunTests: * fail_env_changed * num_workers * Rename MultiprocessTestRunner to RunWorkers. Add num_workers parameters to RunWorkers constructor. Remove RunWorkers.ns attribute. * Rename TestWorkerProcess to WorkerThread. * get_running() now returns a string like: "running (...): ...". * Regrtest.action_run_tests() now selects the number of worker processes, instead of the command line parser. --- Lib/test/libregrtest/cmdline.py | 6 +-- Lib/test/libregrtest/main.py | 44 +++++++++++++-------- Lib/test/libregrtest/runtest.py | 1 + Lib/test/libregrtest/runtest_mp.py | 62 ++++++++++++++---------------- 4 files changed, 57 insertions(+), 56 deletions(-) diff --git a/Lib/test/libregrtest/cmdline.py b/Lib/test/libregrtest/cmdline.py index 9afb13224975ee..2835546fc713cf 100644 --- a/Lib/test/libregrtest/cmdline.py +++ b/Lib/test/libregrtest/cmdline.py @@ -1,5 +1,5 @@ import argparse -import os +import os.path import shlex import sys from test.support import os_helper @@ -410,10 +410,6 @@ def _parse_args(args, **kwargs): if ns.timeout is not None: if ns.timeout <= 0: ns.timeout = None - if ns.use_mp is not None: - if ns.use_mp <= 0: - # Use all cores + extras for tests that like to sleep - ns.use_mp = 2 + (os.cpu_count() or 1) if ns.use: for a in ns.use: for r in a: diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py index 4066d06c98600e..1fa7b07a09d701 100644 --- a/Lib/test/libregrtest/main.py +++ b/Lib/test/libregrtest/main.py @@ -83,8 +83,18 @@ def __init__(self, ns: Namespace): self.fromfile: str | None = ns.fromfile self.starting_test: str | None = ns.start + # Run tests + if ns.use_mp is None: + num_workers = 0 # run sequentially + elif ns.use_mp <= 0: + num_workers = -1 # use the number of CPUs + else: + num_workers = ns.use_mp + self.num_workers: int = num_workers + # Options to run tests self.fail_fast: bool = ns.failfast + self.fail_env_changed: bool = ns.fail_env_changed self.forever: bool = ns.forever self.randomize: bool = ns.randomize self.random_seed: int | None = ns.random_seed @@ -150,7 +160,6 @@ def get_executed(self): | set(self.run_no_tests)) def accumulate_result(self, result, rerun=False): - fail_env_changed = self.ns.fail_env_changed test_name = result.test_name match result.state: @@ -167,7 +176,7 @@ def accumulate_result(self, result, rerun=False): case State.DID_NOT_RUN: self.run_no_tests.append(test_name) case _: - if result.is_failed(fail_env_changed): + if result.is_failed(self.fail_env_changed): self.bad.append(test_name) self.need_rerun.append(result) else: @@ -339,9 +348,8 @@ def get_rerun_match(self, rerun_list) -> FilterDict: def _rerun_failed_tests(self, need_rerun, runtests: RunTests): # Configure the runner to re-run tests - ns = self.ns - if ns.use_mp is None: - ns.use_mp = 1 + if self.num_workers == 0: + self.num_workers = 1 # Get tests to re-run tests = [result.test_name for result in need_rerun] @@ -363,7 +371,7 @@ def _rerun_failed_tests(self, need_rerun, runtests: RunTests): match_tests_dict=match_tests_dict, output_on_failure=False) self.set_tests(runtests) - self._run_tests_mp(runtests) + self._run_tests_mp(runtests, self.num_workers) return runtests def rerun_failed_tests(self, need_rerun, runtests: RunTests): @@ -471,7 +479,6 @@ def run_test(self, test_name: str, runtests: RunTests, tracer): def run_tests_sequentially(self, runtests): ns = self.ns coverage = ns.trace - fail_env_changed = ns.fail_env_changed if coverage: import trace @@ -503,7 +510,7 @@ def run_tests_sequentially(self, runtests): if module not in save_modules and module.startswith("test."): support.unload(module) - if result.must_stop(self.fail_fast, fail_env_changed): + if result.must_stop(self.fail_fast, self.fail_env_changed): break previous_test = str(result) @@ -564,12 +571,10 @@ def no_tests_run(self): self.environment_changed)) def get_tests_state(self): - fail_env_changed = self.ns.fail_env_changed - result = [] if self.bad: result.append("FAILURE") - elif fail_env_changed and self.environment_changed: + elif self.fail_env_changed and self.environment_changed: result.append("ENV CHANGED") elif self.no_tests_run(): result.append("NO TESTS RAN") @@ -585,8 +590,9 @@ def get_tests_state(self): result = '%s then %s' % (self.first_state, result) return result - def _run_tests_mp(self, runtests: RunTests) -> None: - from test.libregrtest.runtest_mp import run_tests_multiprocess + def _run_tests_mp(self, runtests: RunTests, num_workers: int) -> None: + from test.libregrtest.runtest_mp import RunWorkers + # If we're on windows and this is the parent runner (not a worker), # track the load average. if sys.platform == 'win32': @@ -600,7 +606,7 @@ def _run_tests_mp(self, runtests: RunTests) -> None: print(f'Failed to create WindowsLoadTracker: {error}') try: - run_tests_multiprocess(self, runtests) + RunWorkers(self, runtests, num_workers).run() finally: if self.win_load_tracker is not None: self.win_load_tracker.close() @@ -618,8 +624,8 @@ def set_tests(self, runtests: RunTests): def run_tests(self, runtests: RunTests): self.first_runtests = runtests self.set_tests(runtests) - if self.ns.use_mp: - self._run_tests_mp(runtests) + if self.num_workers: + self._run_tests_mp(runtests, self.num_workers) tracer = None else: tracer = self.run_tests_sequentially(runtests) @@ -843,7 +849,7 @@ def get_exitcode(self): exitcode = EXITCODE_BAD_TEST elif self.interrupted: exitcode = EXITCODE_INTERRUPTED - elif self.ns.fail_env_changed and self.environment_changed: + elif self.fail_env_changed and self.environment_changed: exitcode = EXITCODE_ENV_CHANGED elif self.no_tests_run(): exitcode = EXITCODE_NO_TESTS_RAN @@ -866,6 +872,10 @@ def action_run_tests(self): if self.randomize: print("Using random seed", self.random_seed) + if self.num_workers < 0: + # Use all cores + extras for tests that like to sleep + self.num_workers = 2 + (os.cpu_count() or 1) + runtests = RunTests( tuple(self.selected), fail_fast=self.fail_fast, diff --git a/Lib/test/libregrtest/runtest.py b/Lib/test/libregrtest/runtest.py index dc574eda8b99f5..667701778d9a79 100644 --- a/Lib/test/libregrtest/runtest.py +++ b/Lib/test/libregrtest/runtest.py @@ -217,6 +217,7 @@ def get_rerun_match_tests(self) -> FilterTuple | None: class RunTests: tests: TestTuple fail_fast: bool = False + fail_env_changed: bool = False match_tests: FilterTuple | None = None ignore_tests: FilterTuple | None = None match_tests_dict: FilterDict | None = None diff --git a/Lib/test/libregrtest/runtest_mp.py b/Lib/test/libregrtest/runtest_mp.py index e4a9301656436b..28c05b5976e159 100644 --- a/Lib/test/libregrtest/runtest_mp.py +++ b/Lib/test/libregrtest/runtest_mp.py @@ -16,7 +16,6 @@ from test.support import os_helper from test.support import TestStats -from test.libregrtest.cmdline import Namespace from test.libregrtest.main import Regrtest from test.libregrtest.runtest import ( run_single_test, TestResult, State, PROGRESS_MIN_TIME, @@ -150,14 +149,13 @@ class ExitThread(Exception): pass -class TestWorkerProcess(threading.Thread): - def __init__(self, worker_id: int, runner: "MultiprocessTestRunner") -> None: +class WorkerThread(threading.Thread): + def __init__(self, worker_id: int, runner: "RunWorkers") -> None: super().__init__() self.worker_id = worker_id self.runtests = runner.runtests self.pending = runner.pending self.output = runner.output - self.ns = runner.ns self.timeout = runner.worker_timeout self.regrtest = runner.regrtest self.current_test_name = None @@ -167,7 +165,7 @@ def __init__(self, worker_id: int, runner: "MultiprocessTestRunner") -> None: self._stopped = False def __repr__(self) -> str: - info = [f'TestWorkerProcess #{self.worker_id}'] + info = [f'WorkerThread #{self.worker_id}'] if self.is_alive(): info.append("running") else: @@ -203,7 +201,7 @@ def _kill(self) -> None: else: popen.kill() except ProcessLookupError: - # popen.kill(): the process completed, the TestWorkerProcess thread + # popen.kill(): the process completed, the WorkerThread thread # read its exit status, but Popen.send_signal() read the returncode # just before Popen.wait() set returncode. pass @@ -362,7 +360,7 @@ def _runtest(self, test_name: str) -> MultiprocessResult: def run(self) -> None: fail_fast = self.runtests.fail_fast - fail_env_changed = self.ns.fail_env_changed + fail_env_changed = self.runtests.fail_env_changed while not self._stopped: try: try: @@ -394,10 +392,10 @@ def _wait_completed(self) -> None: f"{exc!r}") def wait_stopped(self, start_time: float) -> None: - # bpo-38207: MultiprocessTestRunner.stop_workers() called self.stop() + # bpo-38207: RunWorkers.stop_workers() called self.stop() # which killed the process. Sometimes, killing the process from the # main thread does not interrupt popen.communicate() in - # TestWorkerProcess thread. This loop with a timeout is a workaround + # WorkerThread thread. This loop with a timeout is a workaround # for that. # # Moreover, if this method fails to join the thread, it is likely @@ -417,7 +415,7 @@ def wait_stopped(self, start_time: float) -> None: break -def get_running(workers: list[TestWorkerProcess]) -> list[TestWorkerProcess]: +def get_running(workers: list[WorkerThread]) -> list[str]: running = [] for worker in workers: current_test_name = worker.current_test_name @@ -427,18 +425,17 @@ def get_running(workers: list[TestWorkerProcess]) -> list[TestWorkerProcess]: if dt >= PROGRESS_MIN_TIME: text = '%s (%s)' % (current_test_name, format_duration(dt)) running.append(text) - return running + if not running: + return None + return f"running ({len(running)}): {', '.join(running)}" -class MultiprocessTestRunner: - def __init__(self, regrtest: Regrtest, runtests: RunTests) -> None: - ns = regrtest.ns - +class RunWorkers: + def __init__(self, regrtest: Regrtest, runtests: RunTests, num_workers: int) -> None: self.regrtest = regrtest + self.log = regrtest.log + self.num_workers = num_workers self.runtests = runtests - self.rerun = runtests.rerun - self.log = self.regrtest.log - self.ns = ns self.output: queue.Queue[QueueOutput] = queue.Queue() tests_iter = runtests.iter_tests() self.pending = MultiprocessIterator(tests_iter) @@ -453,9 +450,8 @@ def __init__(self, regrtest: Regrtest, runtests: RunTests) -> None: self.workers = None def start_workers(self) -> None: - use_mp = self.ns.use_mp - self.workers = [TestWorkerProcess(index, self) - for index in range(1, use_mp + 1)] + self.workers = [WorkerThread(index, self) + for index in range(1, self.num_workers + 1)] msg = f"Run tests in parallel using {len(self.workers)} child processes" if self.timeout: msg += (" (timeout: %s, worker timeout: %s)" @@ -489,10 +485,11 @@ def _get_result(self) -> QueueOutput | None: except queue.Empty: pass - # display progress - running = get_running(self.workers) - if running and not pgo: - self.log('running: %s' % ', '.join(running)) + if not pgo: + # display progress + running = get_running(self.workers) + if running: + self.log(running) # all worker threads are done: consume pending results try: @@ -510,9 +507,10 @@ def display_result(self, mp_result: MultiprocessResult) -> None: text += ' (%s)' % mp_result.err_msg elif (result.duration >= PROGRESS_MIN_TIME and not pgo): text += ' (%s)' % format_duration(result.duration) - running = get_running(self.workers) - if running and not pgo: - text += ' -- running: %s' % ', '.join(running) + if not pgo: + running = get_running(self.workers) + if running: + text += f' -- {running}' self.regrtest.display_progress(self.test_index, text) def _process_result(self, item: QueueOutput) -> bool: @@ -537,9 +535,9 @@ def _process_result(self, item: QueueOutput) -> bool: return result - def run_tests(self) -> None: + def run(self) -> None: fail_fast = self.runtests.fail_fast - fail_env_changed = self.ns.fail_env_changed + fail_env_changed = self.runtests.fail_env_changed self.start_workers() @@ -566,10 +564,6 @@ def run_tests(self) -> None: self.stop_workers() -def run_tests_multiprocess(regrtest: Regrtest, runtests: RunTests) -> None: - MultiprocessTestRunner(regrtest, runtests).run_tests() - - class EncodeTestResult(json.JSONEncoder): """Encode a TestResult (sub)class object into a JSON dict."""