Skip to content

gh-109162: Refactor libregrtest.runtest_mp #109205

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions Lib/test/libregrtest/cmdline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import argparse
import os
import os.path
import shlex
import sys
from test.support import os_helper
Expand Down Expand Up @@ -410,10 +410,6 @@ def _parse_args(args, **kwargs):
if ns.timeout is not None:
if ns.timeout <= 0:
ns.timeout = None
if ns.use_mp is not None:
if ns.use_mp <= 0:
# Use all cores + extras for tests that like to sleep
ns.use_mp = 2 + (os.cpu_count() or 1)
if ns.use:
for a in ns.use:
for r in a:
Expand Down
44 changes: 27 additions & 17 deletions Lib/test/libregrtest/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,18 @@ def __init__(self, ns: Namespace):
self.fromfile: str | None = ns.fromfile
self.starting_test: str | None = ns.start

# Run tests
if ns.use_mp is None:
num_workers = 0 # run sequentially
elif ns.use_mp <= 0:
num_workers = -1 # use the number of CPUs
else:
num_workers = ns.use_mp
self.num_workers: int = num_workers

# Options to run tests
self.fail_fast: bool = ns.failfast
self.fail_env_changed: bool = ns.fail_env_changed
self.forever: bool = ns.forever
self.randomize: bool = ns.randomize
self.random_seed: int | None = ns.random_seed
Expand Down Expand Up @@ -150,7 +160,6 @@ def get_executed(self):
| set(self.run_no_tests))

def accumulate_result(self, result, rerun=False):
fail_env_changed = self.ns.fail_env_changed
test_name = result.test_name

match result.state:
Expand All @@ -167,7 +176,7 @@ def accumulate_result(self, result, rerun=False):
case State.DID_NOT_RUN:
self.run_no_tests.append(test_name)
case _:
if result.is_failed(fail_env_changed):
if result.is_failed(self.fail_env_changed):
self.bad.append(test_name)
self.need_rerun.append(result)
else:
Expand Down Expand Up @@ -339,9 +348,8 @@ def get_rerun_match(self, rerun_list) -> FilterDict:

def _rerun_failed_tests(self, need_rerun, runtests: RunTests):
# Configure the runner to re-run tests
ns = self.ns
if ns.use_mp is None:
ns.use_mp = 1
if self.num_workers == 0:
self.num_workers = 1

# Get tests to re-run
tests = [result.test_name for result in need_rerun]
Expand All @@ -363,7 +371,7 @@ def _rerun_failed_tests(self, need_rerun, runtests: RunTests):
match_tests_dict=match_tests_dict,
output_on_failure=False)
self.set_tests(runtests)
self._run_tests_mp(runtests)
self._run_tests_mp(runtests, self.num_workers)
return runtests

def rerun_failed_tests(self, need_rerun, runtests: RunTests):
Expand Down Expand Up @@ -471,7 +479,6 @@ def run_test(self, test_name: str, runtests: RunTests, tracer):
def run_tests_sequentially(self, runtests):
ns = self.ns
coverage = ns.trace
fail_env_changed = ns.fail_env_changed

if coverage:
import trace
Expand Down Expand Up @@ -503,7 +510,7 @@ def run_tests_sequentially(self, runtests):
if module not in save_modules and module.startswith("test."):
support.unload(module)

if result.must_stop(self.fail_fast, fail_env_changed):
if result.must_stop(self.fail_fast, self.fail_env_changed):
break

previous_test = str(result)
Expand Down Expand Up @@ -564,12 +571,10 @@ def no_tests_run(self):
self.environment_changed))

def get_tests_state(self):
fail_env_changed = self.ns.fail_env_changed

result = []
if self.bad:
result.append("FAILURE")
elif fail_env_changed and self.environment_changed:
elif self.fail_env_changed and self.environment_changed:
result.append("ENV CHANGED")
elif self.no_tests_run():
result.append("NO TESTS RAN")
Expand All @@ -585,8 +590,9 @@ def get_tests_state(self):
result = '%s then %s' % (self.first_state, result)
return result

def _run_tests_mp(self, runtests: RunTests) -> None:
from test.libregrtest.runtest_mp import run_tests_multiprocess
def _run_tests_mp(self, runtests: RunTests, num_workers: int) -> None:
from test.libregrtest.runtest_mp import RunWorkers

# If we're on windows and this is the parent runner (not a worker),
# track the load average.
if sys.platform == 'win32':
Expand All @@ -600,7 +606,7 @@ def _run_tests_mp(self, runtests: RunTests) -> None:
print(f'Failed to create WindowsLoadTracker: {error}')

try:
run_tests_multiprocess(self, runtests)
RunWorkers(self, runtests, num_workers).run()
finally:
if self.win_load_tracker is not None:
self.win_load_tracker.close()
Expand All @@ -618,8 +624,8 @@ def set_tests(self, runtests: RunTests):
def run_tests(self, runtests: RunTests):
self.first_runtests = runtests
self.set_tests(runtests)
if self.ns.use_mp:
self._run_tests_mp(runtests)
if self.num_workers:
self._run_tests_mp(runtests, self.num_workers)
tracer = None
else:
tracer = self.run_tests_sequentially(runtests)
Expand Down Expand Up @@ -843,7 +849,7 @@ def get_exitcode(self):
exitcode = EXITCODE_BAD_TEST
elif self.interrupted:
exitcode = EXITCODE_INTERRUPTED
elif self.ns.fail_env_changed and self.environment_changed:
elif self.fail_env_changed and self.environment_changed:
exitcode = EXITCODE_ENV_CHANGED
elif self.no_tests_run():
exitcode = EXITCODE_NO_TESTS_RAN
Expand All @@ -866,6 +872,10 @@ def action_run_tests(self):
if self.randomize:
print("Using random seed", self.random_seed)

if self.num_workers < 0:
# Use all cores + extras for tests that like to sleep
self.num_workers = 2 + (os.cpu_count() or 1)

runtests = RunTests(
tuple(self.selected),
fail_fast=self.fail_fast,
Expand Down
1 change: 1 addition & 0 deletions Lib/test/libregrtest/runtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ def get_rerun_match_tests(self) -> FilterTuple | None:
class RunTests:
tests: TestTuple
fail_fast: bool = False
fail_env_changed: bool = False
match_tests: FilterTuple | None = None
ignore_tests: FilterTuple | None = None
match_tests_dict: FilterDict | None = None
Expand Down
62 changes: 28 additions & 34 deletions Lib/test/libregrtest/runtest_mp.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from test.support import os_helper
from test.support import TestStats

from test.libregrtest.cmdline import Namespace
from test.libregrtest.main import Regrtest
from test.libregrtest.runtest import (
run_single_test, TestResult, State, PROGRESS_MIN_TIME,
Expand Down Expand Up @@ -150,14 +149,13 @@ class ExitThread(Exception):
pass


class TestWorkerProcess(threading.Thread):
def __init__(self, worker_id: int, runner: "MultiprocessTestRunner") -> None:
class WorkerThread(threading.Thread):
def __init__(self, worker_id: int, runner: "RunWorkers") -> None:
super().__init__()
self.worker_id = worker_id
self.runtests = runner.runtests
self.pending = runner.pending
self.output = runner.output
self.ns = runner.ns
self.timeout = runner.worker_timeout
self.regrtest = runner.regrtest
self.current_test_name = None
Expand All @@ -167,7 +165,7 @@ def __init__(self, worker_id: int, runner: "MultiprocessTestRunner") -> None:
self._stopped = False

def __repr__(self) -> str:
info = [f'TestWorkerProcess #{self.worker_id}']
info = [f'WorkerThread #{self.worker_id}']
if self.is_alive():
info.append("running")
else:
Expand Down Expand Up @@ -203,7 +201,7 @@ def _kill(self) -> None:
else:
popen.kill()
except ProcessLookupError:
# popen.kill(): the process completed, the TestWorkerProcess thread
# popen.kill(): the process completed, the WorkerThread thread
# read its exit status, but Popen.send_signal() read the returncode
# just before Popen.wait() set returncode.
pass
Expand Down Expand Up @@ -362,7 +360,7 @@ def _runtest(self, test_name: str) -> MultiprocessResult:

def run(self) -> None:
fail_fast = self.runtests.fail_fast
fail_env_changed = self.ns.fail_env_changed
fail_env_changed = self.runtests.fail_env_changed
while not self._stopped:
try:
try:
Expand Down Expand Up @@ -394,10 +392,10 @@ def _wait_completed(self) -> None:
f"{exc!r}")

def wait_stopped(self, start_time: float) -> None:
# bpo-38207: MultiprocessTestRunner.stop_workers() called self.stop()
# bpo-38207: RunWorkers.stop_workers() called self.stop()
# which killed the process. Sometimes, killing the process from the
# main thread does not interrupt popen.communicate() in
# TestWorkerProcess thread. This loop with a timeout is a workaround
# WorkerThread thread. This loop with a timeout is a workaround
# for that.
#
# Moreover, if this method fails to join the thread, it is likely
Expand All @@ -417,7 +415,7 @@ def wait_stopped(self, start_time: float) -> None:
break


def get_running(workers: list[TestWorkerProcess]) -> list[TestWorkerProcess]:
def get_running(workers: list[WorkerThread]) -> list[str]:
running = []
for worker in workers:
current_test_name = worker.current_test_name
Expand All @@ -427,18 +425,17 @@ def get_running(workers: list[TestWorkerProcess]) -> list[TestWorkerProcess]:
if dt >= PROGRESS_MIN_TIME:
text = '%s (%s)' % (current_test_name, format_duration(dt))
running.append(text)
return running
if not running:
return None
return f"running ({len(running)}): {', '.join(running)}"


class MultiprocessTestRunner:
def __init__(self, regrtest: Regrtest, runtests: RunTests) -> None:
ns = regrtest.ns

class RunWorkers:
def __init__(self, regrtest: Regrtest, runtests: RunTests, num_workers: int) -> None:
self.regrtest = regrtest
self.log = regrtest.log
self.num_workers = num_workers
self.runtests = runtests
self.rerun = runtests.rerun
self.log = self.regrtest.log
self.ns = ns
self.output: queue.Queue[QueueOutput] = queue.Queue()
tests_iter = runtests.iter_tests()
self.pending = MultiprocessIterator(tests_iter)
Expand All @@ -453,9 +450,8 @@ def __init__(self, regrtest: Regrtest, runtests: RunTests) -> None:
self.workers = None

def start_workers(self) -> None:
use_mp = self.ns.use_mp
self.workers = [TestWorkerProcess(index, self)
for index in range(1, use_mp + 1)]
self.workers = [WorkerThread(index, self)
for index in range(1, self.num_workers + 1)]
msg = f"Run tests in parallel using {len(self.workers)} child processes"
if self.timeout:
msg += (" (timeout: %s, worker timeout: %s)"
Expand Down Expand Up @@ -489,10 +485,11 @@ def _get_result(self) -> QueueOutput | None:
except queue.Empty:
pass

# display progress
running = get_running(self.workers)
if running and not pgo:
self.log('running: %s' % ', '.join(running))
if not pgo:
# display progress
running = get_running(self.workers)
if running:
self.log(running)

# all worker threads are done: consume pending results
try:
Expand All @@ -510,9 +507,10 @@ def display_result(self, mp_result: MultiprocessResult) -> None:
text += ' (%s)' % mp_result.err_msg
elif (result.duration >= PROGRESS_MIN_TIME and not pgo):
text += ' (%s)' % format_duration(result.duration)
running = get_running(self.workers)
if running and not pgo:
text += ' -- running: %s' % ', '.join(running)
if not pgo:
running = get_running(self.workers)
if running:
text += f' -- {running}'
self.regrtest.display_progress(self.test_index, text)

def _process_result(self, item: QueueOutput) -> bool:
Expand All @@ -537,9 +535,9 @@ def _process_result(self, item: QueueOutput) -> bool:

return result

def run_tests(self) -> None:
def run(self) -> None:
fail_fast = self.runtests.fail_fast
fail_env_changed = self.ns.fail_env_changed
fail_env_changed = self.runtests.fail_env_changed

self.start_workers()

Expand All @@ -566,10 +564,6 @@ def run_tests(self) -> None:
self.stop_workers()


def run_tests_multiprocess(regrtest: Regrtest, runtests: RunTests) -> None:
MultiprocessTestRunner(regrtest, runtests).run_tests()


class EncodeTestResult(json.JSONEncoder):
"""Encode a TestResult (sub)class object into a JSON dict."""

Expand Down