Skip to content

[3.7] [3.8] bpo-36670, regrtest: Fix WindowsLoadTracker() for partial line (GH-16550) (GH-16560) #16562

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 21 additions & 14 deletions Lib/test/libregrtest/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,16 +138,8 @@ def accumulate_result(self, result, rerun=False):
print(xml_data, file=sys.__stderr__)
raise

def display_progress(self, test_index, text):
if self.ns.quiet:
return

# "[ 51/405/1] test_tcl passed"
line = f"{test_index:{self.test_count_width}}{self.test_count}"
fails = len(self.bad) + len(self.environment_changed)
if fails and not self.ns.pgo:
line = f"{line}/{fails}"
line = f"[{line}] {text}"
def log(self, line=''):
empty = not line

# add the system load prefix: "load avg: 1.80 "
load_avg = self.getloadavg()
Expand All @@ -158,8 +150,23 @@ def display_progress(self, test_index, text):
test_time = time.monotonic() - self.start_time
test_time = datetime.timedelta(seconds=int(test_time))
line = f"{test_time} {line}"

if empty:
line = line[:-1]

print(line, flush=True)

def display_progress(self, test_index, text):
if self.ns.quiet:
return

# "[ 51/405/1] test_tcl passed"
line = f"{test_index:{self.test_count_width}}{self.test_count}"
fails = len(self.bad) + len(self.environment_changed)
if fails and not self.ns.pgo:
line = f"{line}/{fails}"
self.log(f"[{line}] {text}")

def parse_args(self, kwargs):
ns = _parse_args(sys.argv[1:], **kwargs)

Expand Down Expand Up @@ -297,11 +304,11 @@ def rerun_failed_tests(self):

self.first_result = self.get_tests_result()

print()
print("Re-running failed tests in verbose mode")
self.log()
self.log("Re-running failed tests in verbose mode")
self.rerun = self.bad[:]
for test_name in self.rerun:
print(f"Re-running {test_name} in verbose mode", flush=True)
self.log(f"Re-running {test_name} in verbose mode")
self.ns.verbose = True
result = runtest(self.ns, test_name)

Expand Down Expand Up @@ -382,7 +389,7 @@ def run_tests_sequential(self):

save_modules = sys.modules.keys()

print("Run tests sequentially")
self.log("Run tests sequentially")

previous_test = None
for test_index, test_name in enumerate(self.tests, 1):
Expand Down
27 changes: 14 additions & 13 deletions Lib/test/libregrtest/runtest_mp.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,14 @@ class ExitThread(Exception):


class TestWorkerProcess(threading.Thread):
def __init__(self, worker_id, pending, output, ns, timeout):
def __init__(self, worker_id, runner):
super().__init__()
self.worker_id = worker_id
self.pending = pending
self.output = output
self.ns = ns
self.timeout = timeout
self.pending = runner.pending
self.output = runner.output
self.ns = runner.ns
self.timeout = runner.worker_timeout
self.regrtest = runner.regrtest
self.current_test_name = None
self.start_time = None
self._popen = None
Expand Down Expand Up @@ -292,7 +293,8 @@ def wait_stopped(self, start_time):
if not self.is_alive():
break
dt = time.monotonic() - start_time
print(f"Waiting for {self} thread for {format_duration(dt)}", flush=True)
self.regrtest.log(f"Waiting for {self} thread "
f"for {format_duration(dt)}")
if dt > JOIN_TIMEOUT:
print_warning(f"Failed to join {self} in {format_duration(dt)}")
break
Expand All @@ -314,6 +316,7 @@ def get_running(workers):
class MultiprocessTestRunner:
def __init__(self, regrtest):
self.regrtest = regrtest
self.log = self.regrtest.log
self.ns = regrtest.ns
self.output = queue.Queue()
self.pending = MultiprocessIterator(self.regrtest.tests)
Expand All @@ -324,11 +327,10 @@ def __init__(self, regrtest):
self.workers = None

def start_workers(self):
self.workers = [TestWorkerProcess(index, self.pending, self.output,
self.ns, self.worker_timeout)
self.workers = [TestWorkerProcess(index, self)
for index in range(1, self.ns.use_mp + 1)]
print("Run tests in parallel using %s child processes"
% len(self.workers))
self.log("Run tests in parallel using %s child processes"
% len(self.workers))
for worker in self.workers:
worker.start()

Expand Down Expand Up @@ -362,7 +364,7 @@ def _get_result(self):
# display progress
running = get_running(self.workers)
if running and not self.ns.pgo:
print('running: %s' % ', '.join(running), flush=True)
self.log('running: %s' % ', '.join(running))

def display_result(self, mp_result):
result = mp_result.result
Expand All @@ -382,8 +384,7 @@ def _process_result(self, item):
if item[0]:
# Thread got an exception
format_exc = item[1]
print(f"regrtest worker thread failed: {format_exc}",
file=sys.stderr, flush=True)
print_warning(f"regrtest worker thread failed: {format_exc}")
return True

self.test_index += 1
Expand Down
11 changes: 7 additions & 4 deletions Lib/test/libregrtest/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ def format_duration(seconds):
if minutes:
parts.append('%s min' % minutes)
if seconds:
parts.append('%s sec' % seconds)
if ms:
parts.append('%s ms' % ms)
if parts:
# 2 min 1 sec
parts.append('%s sec' % seconds)
else:
# 1.0 sec
parts.append('%.1f sec' % (seconds + ms / 1000))
if not parts:
return '0 ms'
return '%s ms' % ms

parts = parts[:2]
return ' '.join(parts)
Expand Down
121 changes: 84 additions & 37 deletions Lib/test/libregrtest/win_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import _winapi
import math
import msvcrt
import os
import subprocess
Expand All @@ -10,11 +11,14 @@

# Max size of asynchronous reads
BUFSIZE = 8192
# Exponential damping factor (see below)
LOAD_FACTOR_1 = 0.9200444146293232478931553241

# Seconds per measurement
SAMPLING_INTERVAL = 5
SAMPLING_INTERVAL = 1
# Exponential damping factor to compute exponentially weighted moving average
# on 1 minute (60 seconds)
LOAD_FACTOR_1 = 1 / math.exp(SAMPLING_INTERVAL / 60)
# Initialize the load using the arithmetic mean of the first NVALUE values
# of the Processor Queue Length
NVALUE = 5
# Windows registry subkey of HKEY_LOCAL_MACHINE where the counter names
# of typeperf are registered
COUNTER_REGISTRY_KEY = (r"SOFTWARE\Microsoft\Windows NT\CurrentVersion"
Expand All @@ -30,9 +34,10 @@ class WindowsLoadTracker():
"""

def __init__(self):
self.load = 0.0
self.counter_name = ''
self.popen = None
self._values = []
self._load = None
self._buffer = ''
self._popen = None
self.start()

def start(self):
Expand Down Expand Up @@ -64,7 +69,7 @@ def start(self):
# Spawn off the load monitor
counter_name = self._get_counter_name()
command = ['typeperf', counter_name, '-si', str(SAMPLING_INTERVAL)]
self.popen = subprocess.Popen(' '.join(command), stdout=command_stdout, cwd=support.SAVEDCWD)
self._popen = subprocess.Popen(' '.join(command), stdout=command_stdout, cwd=support.SAVEDCWD)

# Close our copy of the write end of the pipe
os.close(command_stdout)
Expand All @@ -84,60 +89,102 @@ def _get_counter_name(self):
process_queue_length = counters_dict['44']
return f'"\\{system}\\{process_queue_length}"'

def close(self):
if self.popen is None:
def close(self, kill=True):
if self._popen is None:
return
self.popen.kill()
self.popen.wait()
self.popen = None

self._load = None

if kill:
self._popen.kill()
self._popen.wait()
self._popen = None

def __del__(self):
self.close()

def read_output(self):
def _parse_line(self, line):
# typeperf outputs in a CSV format like this:
# "07/19/2018 01:32:26.605","3.000000"
# (date, process queue length)
tokens = line.split(',')
if len(tokens) != 2:
raise ValueError

value = tokens[1]
if not value.startswith('"') or not value.endswith('"'):
raise ValueError
value = value[1:-1]
return float(value)

def _read_lines(self):
overlapped, _ = _winapi.ReadFile(self.pipe, BUFSIZE, True)
bytes_read, res = overlapped.GetOverlappedResult(False)
if res != 0:
return
return ()

output = overlapped.getbuffer()
return output.decode('oem', 'replace')
output = output.decode('oem', 'replace')
output = self._buffer + output
lines = output.splitlines(True)

# bpo-36670: typeperf only writes a newline *before* writing a value,
# not after. Sometimes, the written line in incomplete (ex: only
# timestamp, without the process queue length). Only pass the last line
# to the parser if it's a valid value, otherwise store it in
# self._buffer.
try:
self._parse_line(lines[-1])
except ValueError:
self._buffer = lines.pop(-1)
else:
self._buffer = ''

return lines

def getloadavg(self):
typeperf_output = self.read_output()
# Nothing to update, just return the current load
if not typeperf_output:
return self.load
if self._popen is None:
return None

returncode = self._popen.poll()
if returncode is not None:
self.close(kill=False)
return None

try:
lines = self._read_lines()
except BrokenPipeError:
self.close()
return None

for line in lines:
line = line.rstrip()

# Process the backlog of load values
for line in typeperf_output.splitlines():
# Ignore the initial header:
# "(PDH-CSV 4.0)","\\\\WIN\\System\\Processor Queue Length"
if '\\\\' in line:
if 'PDH-CSV' in line:
continue

# Ignore blank lines
if not line.strip():
if not line:
continue

# typeperf outputs in a CSV format like this:
# "07/19/2018 01:32:26.605","3.000000"
# (date, process queue length)
try:
tokens = line.split(',')
if len(tokens) != 2:
raise ValueError

value = tokens[1].replace('"', '')
load = float(value)
processor_queue_length = self._parse_line(line)
except ValueError:
print_warning("Failed to parse typeperf output: %a" % line)
continue

# We use an exponentially weighted moving average, imitating the
# load calculation on Unix systems.
# https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation
new_load = self.load * LOAD_FACTOR_1 + load * (1.0 - LOAD_FACTOR_1)
self.load = new_load

return self.load
# https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
if self._load is not None:
self._load = (self._load * LOAD_FACTOR_1
+ processor_queue_length * (1.0 - LOAD_FACTOR_1))
elif len(self._values) < NVALUE:
self._values.append(processor_queue_length)
else:
self._load = sum(self._values) / len(self._values)

return self._load
14 changes: 8 additions & 6 deletions Lib/test/test_regrtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
Py_DEBUG = hasattr(sys, 'gettotalrefcount')
ROOT_DIR = os.path.join(os.path.dirname(__file__), '..', '..')
ROOT_DIR = os.path.abspath(os.path.normpath(ROOT_DIR))
LOG_PREFIX = r'[0-9]+:[0-9]+:[0-9]+ (?:load avg: [0-9]+\.[0-9]{2} )?'

TEST_INTERRUPTED = textwrap.dedent("""
from signal import SIGINT
Expand Down Expand Up @@ -390,8 +391,8 @@ def check_line(self, output, regex):
self.assertRegex(output, regex)

def parse_executed_tests(self, output):
regex = (r'^[0-9]+:[0-9]+:[0-9]+ (?:load avg: [0-9]+\.[0-9]{2} )?\[ *[0-9]+(?:/ *[0-9]+)*\] (%s)'
% self.TESTNAME_REGEX)
regex = (r'^%s\[ *[0-9]+(?:/ *[0-9]+)*\] (%s)'
% (LOG_PREFIX, self.TESTNAME_REGEX))
parser = re.finditer(regex, output, re.MULTILINE)
return list(match.group(1) for match in parser)

Expand Down Expand Up @@ -451,9 +452,10 @@ def list_regex(line_format, tests):
if rerun:
regex = list_regex('%s re-run test%s', rerun)
self.check_line(output, regex)
self.check_line(output, "Re-running failed tests in verbose mode")
regex = LOG_PREFIX + r"Re-running failed tests in verbose mode"
self.check_line(output, regex)
for test_name in rerun:
regex = f"Re-running {test_name} in verbose mode"
regex = LOG_PREFIX + f"Re-running {test_name} in verbose mode"
self.check_line(output, regex)

if no_test_ran:
Expand Down Expand Up @@ -1173,9 +1175,9 @@ def test_format_duration(self):
self.assertEqual(utils.format_duration(10e-3),
'10 ms')
self.assertEqual(utils.format_duration(1.5),
'1 sec 500 ms')
'1.5 sec')
self.assertEqual(utils.format_duration(1),
'1 sec')
'1.0 sec')
self.assertEqual(utils.format_duration(2 * 60),
'2 min')
self.assertEqual(utils.format_duration(2 * 60 + 1),
Expand Down