-
-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Optimize tests (parallel execution) #3019
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
d43ee63
3a64f7f
1396815
d7d90c0
ed5b3e8
4c61e02
72c57d0
5fd33b9
d7a436f
aeda5d9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,6 +12,7 @@ docs/build/ | |
.mypy_cache/ | ||
.incremental_checker_cache.json | ||
.cache | ||
.runtest_log.json | ||
|
||
# Packages | ||
*.egg | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,15 +3,18 @@ | |
This is used for running mypy tests. | ||
""" | ||
|
||
from typing import Dict, List, Optional, Set, Tuple | ||
from typing import Dict, List, Optional, Set, Tuple, Any, Iterable | ||
|
||
import os | ||
from multiprocessing import cpu_count | ||
import pipes | ||
import re | ||
from subprocess import Popen, STDOUT | ||
import sys | ||
import tempfile | ||
import time | ||
import json | ||
from collections import defaultdict | ||
|
||
|
||
class WaiterError(Exception): | ||
|
@@ -32,7 +35,7 @@ def __init__(self, name: str, args: List[str], *, cwd: str = None, | |
|
||
def start(self) -> None: | ||
self.outfile = tempfile.TemporaryFile() | ||
self.start_time = time.time() | ||
self.start_time = time.perf_counter() | ||
self.process = Popen(self.args, cwd=self.cwd, env=self.env, | ||
stdout=self.outfile, stderr=STDOUT) | ||
self.pid = self.process.pid | ||
|
@@ -107,7 +110,11 @@ class Waiter: | |
if not waiter.run(): | ||
print('error') | ||
""" | ||
def __init__(self, limit: int = 0, *, verbosity: int = 0, xfail: List[str] = []) -> None: | ||
LOGSIZE = 50 | ||
FULL_LOG_FILENAME = '.runtest_log.json' | ||
|
||
def __init__(self, limit: int = 0, *, verbosity: int = 0, xfail: List[str] = [], | ||
lf: bool = False, ff: bool = False) -> None: | ||
self.verbosity = verbosity | ||
self.queue = [] # type: List[LazySubprocess] | ||
# Index of next task to run in the queue. | ||
|
@@ -117,21 +124,42 @@ def __init__(self, limit: int = 0, *, verbosity: int = 0, xfail: List[str] = []) | |
try: | ||
sched_getaffinity = os.sched_getaffinity | ||
except AttributeError: | ||
limit = 2 | ||
# no support for affinity on OSX/Windows | ||
limit = cpu_count() | ||
else: | ||
# Note: only count CPUs we are allowed to use. It is a | ||
# major mistake to count *all* CPUs on the machine. | ||
limit = len(sched_getaffinity(0)) | ||
self.limit = limit | ||
self.lf = lf | ||
self.ff = ff | ||
assert limit > 0 | ||
self.xfail = set(xfail) | ||
self._note = None # type: Noter | ||
self.times1 = {} # type: Dict[str, float] | ||
self.times2 = {} # type: Dict[str, float] | ||
|
||
def add(self, cmd: LazySubprocess) -> int: | ||
self.new_log = defaultdict(dict) # type: Dict[str, Dict[str, float]] | ||
self.sequential_tasks = set() # type: Set[str] | ||
|
||
def load_log_file(self) -> Optional[List[Dict[str, Dict[str, Any]]]]: | ||
try: | ||
# get the last log | ||
with open(self.FULL_LOG_FILENAME) as fp: | ||
test_log = json.load(fp) | ||
except FileNotFoundError: | ||
test_log = [] | ||
except json.JSONDecodeError: | ||
print('corrupt test log file {}'.format(self.FULL_LOG_FILENAME), file=sys.stderr) | ||
test_log = [] | ||
return test_log | ||
|
||
def add(self, cmd: LazySubprocess, sequential: bool = False) -> int: | ||
rv = len(self.queue) | ||
if cmd.name in (task.name for task in self.queue): | ||
sys.exit('Duplicate test name: {}'.format(cmd.name)) | ||
self.queue.append(cmd) | ||
if sequential: | ||
self.sequential_tasks.add(cmd.name) | ||
return rv | ||
|
||
def _start_next(self) -> None: | ||
|
@@ -161,12 +189,14 @@ def _record_time(self, name: str, elapsed_time: float) -> None: | |
|
||
def _poll_current(self) -> Tuple[int, int]: | ||
while True: | ||
time.sleep(.05) | ||
time.sleep(.01) | ||
for pid in self.current: | ||
cmd = self.current[pid][1] | ||
code = cmd.process.poll() | ||
if code is not None: | ||
cmd.end_time = time.time() | ||
cmd.end_time = time.perf_counter() | ||
self.new_log['exit_code'][cmd.name] = code | ||
self.new_log['runtime'][cmd.name] = cmd.end_time - cmd.start_time | ||
return pid, code | ||
|
||
def _wait_next(self) -> Tuple[List[str], int, int]: | ||
|
@@ -239,22 +269,83 @@ def run(self) -> int: | |
if self.verbosity == 0: | ||
self._note = Noter(len(self.queue)) | ||
print('SUMMARY %d tasks selected' % len(self.queue)) | ||
|
||
def avg(lst: Iterable[float]) -> float: | ||
valid_items = [item for item in lst if item is not None] | ||
if not valid_items: | ||
# we don't know how long a new task takes | ||
# better err by putting it in front in case it is slow: | ||
# a fast task in front hurts performance less than a slow task in the back | ||
return float('inf') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 But There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, should I move it out to module level? I just wasn't sure about polluting module namespace with a somewhat customized averaging function (after all, it does ignore |
||
else: | ||
return sum(valid_items) / len(valid_items) | ||
|
||
logs = self.load_log_file() | ||
if logs: | ||
times = {cmd.name: avg(log['runtime'].get(cmd.name, None) for log in logs) | ||
for cmd in self.queue} | ||
|
||
def sort_function(cmd: LazySubprocess) -> Tuple[Any, int, float]: | ||
# longest tasks first | ||
runtime = -times[cmd.name] | ||
# sequential tasks go first by default | ||
sequential = -(cmd.name in self.sequential_tasks) | ||
if self.ff: | ||
# failed tasks first with -ff | ||
exit_code = -logs[-1]['exit_code'].get(cmd.name, 0) | ||
if not exit_code: | ||
# avoid interrupting parallel tasks with sequential in between | ||
# so either: seq failed, parallel failed, parallel passed, seq passed | ||
# or: parallel failed, seq failed, seq passed, parallel passed | ||
# I picked the first one arbitrarily, since no obvious pros/cons | ||
# in other words, among failed tasks, sequential should go before parallel, | ||
# and among successful tasks, sequential should go after parallel | ||
sequential = -sequential | ||
else: | ||
# ignore exit code without -ff | ||
exit_code = 0 | ||
return exit_code, sequential, runtime | ||
self.queue = sorted(self.queue, key=sort_function) | ||
if self.lf: | ||
self.queue = [cmd for cmd in self.queue | ||
if logs[-1]['exit_code'].get(cmd.name, 0)] | ||
|
||
sys.stdout.flush() | ||
# Failed tasks. | ||
all_failures = [] # type: List[str] | ||
# Number of test cases. Some tasks can involve multiple test cases. | ||
total_tests = 0 | ||
# Number of failed test cases. | ||
total_failed_tests = 0 | ||
running_sequential_task = False | ||
while self.current or self.next < len(self.queue): | ||
while len(self.current) < self.limit and self.next < len(self.queue): | ||
# only start next task if idle, or current and next tasks are both parallel | ||
if running_sequential_task: | ||
break | ||
if self.queue[self.next].name in self.sequential_tasks: | ||
if self.current: | ||
break | ||
else: | ||
running_sequential_task = True | ||
self._start_next() | ||
fails, tests, test_fails = self._wait_next() | ||
running_sequential_task = False | ||
all_failures += fails | ||
total_tests += tests | ||
total_failed_tests += test_fails | ||
if self.verbosity == 0: | ||
self._note.clear() | ||
|
||
if self.new_log: # don't append empty log, it will corrupt the cache file | ||
# log only LOGSIZE most recent tests | ||
test_log = (self.load_log_file() + [self.new_log])[:self.LOGSIZE] | ||
try: | ||
with open(self.FULL_LOG_FILENAME, 'w') as fp: | ||
json.dump(test_log, fp, sort_keys=True, indent=4) | ||
except Exception as e: | ||
print('cannot save test log file:', e) | ||
|
||
if all_failures: | ||
summary = 'SUMMARY %d/%d tasks and %d/%d tests failed' % ( | ||
len(all_failures), len(self.queue), total_failed_tests, total_tests) | ||
|
@@ -271,7 +362,6 @@ def run(self) -> int: | |
len(self.queue), total_tests)) | ||
print('*** OK ***') | ||
sys.stdout.flush() | ||
|
||
return 0 | ||
|
||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Performance improvements by trimming the slowdown loops!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ambv Actually, I'm only hoping so. On the positive side, with ~140 tasks, we're cutting down on the average delay of 0.05 per task, or ~7 sec for the current test suite (well, for slower tasks, it's actually half of that, since on average they finish in the middle of the wait interval). Spread across (say) 4 workers, it reduces total wall time by a quarter of that, or ~2 sec.
But OTOH, reducing the wait time also increases the number of context switches to the main process and the CPU load of the main process (i.e., the process that is waiting for the next task to finish); both would cause slowdown in the workers. I hope it's not enough to become noticeable at 10 ms wait time, given that a context switch is just a few microseconds, and the runtime of the loop seems to be pretty short too. But I'm too lazy to test it.