diff --git a/Lib/test/support/hypothesis_helper.py b/Lib/test/support/hypothesis_helper.py index 40f58a2f59..db93eea5e9 100644 --- a/Lib/test/support/hypothesis_helper.py +++ b/Lib/test/support/hypothesis_helper.py @@ -5,13 +5,6 @@ except ImportError: from . import _hypothesis_stubs as hypothesis else: - # Regrtest changes to use a tempdir as the working directory, so we have - # to tell Hypothesis to use the original in order to persist the database. - from .os_helper import SAVEDCWD - from hypothesis.configuration import set_hypothesis_home_dir - - set_hypothesis_home_dir(os.path.join(SAVEDCWD, ".hypothesis")) - # When using the real Hypothesis, we'll configure it to ignore occasional # slow tests (avoiding flakiness from random VM slowness in CI). hypothesis.settings.register_profile( diff --git a/Lib/test/support/i18n_helper.py b/Lib/test/support/i18n_helper.py new file mode 100644 index 0000000000..2e304f29e8 --- /dev/null +++ b/Lib/test/support/i18n_helper.py @@ -0,0 +1,63 @@ +import re +import subprocess +import sys +import unittest +from pathlib import Path +from test.support import REPO_ROOT, TEST_HOME_DIR, requires_subprocess +from test.test_tools import skip_if_missing + + +pygettext = Path(REPO_ROOT) / 'Tools' / 'i18n' / 'pygettext.py' + +msgid_pattern = re.compile(r'msgid(.*?)(?:msgid_plural|msgctxt|msgstr)', + re.DOTALL) +msgid_string_pattern = re.compile(r'"((?:\\"|[^"])*)"') + + +def _generate_po_file(path, *, stdout_only=True): + res = subprocess.run([sys.executable, pygettext, + '--no-location', '-o', '-', path], + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + text=True) + if stdout_only: + return res.stdout + return res + + +def _extract_msgids(po): + msgids = [] + for msgid in msgid_pattern.findall(po): + msgid_string = ''.join(msgid_string_pattern.findall(msgid)) + msgid_string = msgid_string.replace(r'\"', '"') + if msgid_string: + msgids.append(msgid_string) + return sorted(msgids) + + +def _get_snapshot_path(module_name): + return Path(TEST_HOME_DIR) / 'translationdata' / module_name / 'msgids.txt' + + +@requires_subprocess() +class TestTranslationsBase(unittest.TestCase): + + def assertMsgidsEqual(self, module): + '''Assert that msgids extracted from a given module match a + snapshot. + + ''' + skip_if_missing('i18n') + res = _generate_po_file(module.__file__, stdout_only=False) + self.assertEqual(res.returncode, 0) + self.assertEqual(res.stderr, '') + msgids = _extract_msgids(res.stdout) + snapshot_path = _get_snapshot_path(module.__name__) + snapshot = snapshot_path.read_text().splitlines() + self.assertListEqual(msgids, snapshot) + + +def update_translation_snapshots(module): + contents = _generate_po_file(module.__file__) + msgids = _extract_msgids(contents) + snapshot_path = _get_snapshot_path(module.__name__) + snapshot_path.write_text('\n'.join(msgids)) diff --git a/Lib/test/support/pty_helper.py b/Lib/test/support/pty_helper.py new file mode 100644 index 0000000000..6587fd4033 --- /dev/null +++ b/Lib/test/support/pty_helper.py @@ -0,0 +1,80 @@ +""" +Helper to run a script in a pseudo-terminal. +""" +import os +import selectors +import subprocess +import sys +from contextlib import ExitStack +from errno import EIO + +from test.support.import_helper import import_module + +def run_pty(script, input=b"dummy input\r", env=None): + pty = import_module('pty') + output = bytearray() + [master, slave] = pty.openpty() + args = (sys.executable, '-c', script) + proc = subprocess.Popen(args, stdin=slave, stdout=slave, stderr=slave, env=env) + os.close(slave) + with ExitStack() as cleanup: + cleanup.enter_context(proc) + def terminate(proc): + try: + proc.terminate() + except ProcessLookupError: + # Workaround for Open/Net BSD bug (Issue 16762) + pass + cleanup.callback(terminate, proc) + cleanup.callback(os.close, master) + # Avoid using DefaultSelector and PollSelector. Kqueue() does not + # work with pseudo-terminals on OS X < 10.9 (Issue 20365) and Open + # BSD (Issue 20667). Poll() does not work with OS X 10.6 or 10.4 + # either (Issue 20472). Hopefully the file descriptor is low enough + # to use with select(). + sel = cleanup.enter_context(selectors.SelectSelector()) + sel.register(master, selectors.EVENT_READ | selectors.EVENT_WRITE) + os.set_blocking(master, False) + while True: + for [_, events] in sel.select(): + if events & selectors.EVENT_READ: + try: + chunk = os.read(master, 0x10000) + except OSError as err: + # Linux raises EIO when slave is closed (Issue 5380) + if err.errno != EIO: + raise + chunk = b"" + if not chunk: + return output + output.extend(chunk) + if events & selectors.EVENT_WRITE: + try: + input = input[os.write(master, input):] + except OSError as err: + # Apparently EIO means the slave was closed + if err.errno != EIO: + raise + input = b"" # Stop writing + if not input: + sel.modify(master, selectors.EVENT_READ) + + +###################################################################### +## Fake stdin (for testing interactive debugging) +###################################################################### + +class FakeInput: + """ + A fake input stream for pdb's interactive debugger. Whenever a + line is read, print it (to simulate the user typing it), and then + return it. The set of lines to return is specified in the + constructor; they should not have trailing newlines. + """ + def __init__(self, lines): + self.lines = lines + + def readline(self): + line = self.lines.pop(0) + print(line) + return line + '\n' diff --git a/Lib/test/support/smtpd.py b/Lib/test/support/smtpd.py index ec4e7d2f4c..6052232ec2 100644 --- a/Lib/test/support/smtpd.py +++ b/Lib/test/support/smtpd.py @@ -180,122 +180,122 @@ def _set_rset_state(self): @property def __server(self): warn("Access to __server attribute on SMTPChannel is deprecated, " - "use 'smtp_server' instead", DeprecationWarning, 2) + "use 'smtp_server' instead", DeprecationWarning, 2) return self.smtp_server @__server.setter def __server(self, value): warn("Setting __server attribute on SMTPChannel is deprecated, " - "set 'smtp_server' instead", DeprecationWarning, 2) + "set 'smtp_server' instead", DeprecationWarning, 2) self.smtp_server = value @property def __line(self): warn("Access to __line attribute on SMTPChannel is deprecated, " - "use 'received_lines' instead", DeprecationWarning, 2) + "use 'received_lines' instead", DeprecationWarning, 2) return self.received_lines @__line.setter def __line(self, value): warn("Setting __line attribute on SMTPChannel is deprecated, " - "set 'received_lines' instead", DeprecationWarning, 2) + "set 'received_lines' instead", DeprecationWarning, 2) self.received_lines = value @property def __state(self): warn("Access to __state attribute on SMTPChannel is deprecated, " - "use 'smtp_state' instead", DeprecationWarning, 2) + "use 'smtp_state' instead", DeprecationWarning, 2) return self.smtp_state @__state.setter def __state(self, value): warn("Setting __state attribute on SMTPChannel is deprecated, " - "set 'smtp_state' instead", DeprecationWarning, 2) + "set 'smtp_state' instead", DeprecationWarning, 2) self.smtp_state = value @property def __greeting(self): warn("Access to __greeting attribute on SMTPChannel is deprecated, " - "use 'seen_greeting' instead", DeprecationWarning, 2) + "use 'seen_greeting' instead", DeprecationWarning, 2) return self.seen_greeting @__greeting.setter def __greeting(self, value): warn("Setting __greeting attribute on SMTPChannel is deprecated, " - "set 'seen_greeting' instead", DeprecationWarning, 2) + "set 'seen_greeting' instead", DeprecationWarning, 2) self.seen_greeting = value @property def __mailfrom(self): warn("Access to __mailfrom attribute on SMTPChannel is deprecated, " - "use 'mailfrom' instead", DeprecationWarning, 2) + "use 'mailfrom' instead", DeprecationWarning, 2) return self.mailfrom @__mailfrom.setter def __mailfrom(self, value): warn("Setting __mailfrom attribute on SMTPChannel is deprecated, " - "set 'mailfrom' instead", DeprecationWarning, 2) + "set 'mailfrom' instead", DeprecationWarning, 2) self.mailfrom = value @property def __rcpttos(self): warn("Access to __rcpttos attribute on SMTPChannel is deprecated, " - "use 'rcpttos' instead", DeprecationWarning, 2) + "use 'rcpttos' instead", DeprecationWarning, 2) return self.rcpttos @__rcpttos.setter def __rcpttos(self, value): warn("Setting __rcpttos attribute on SMTPChannel is deprecated, " - "set 'rcpttos' instead", DeprecationWarning, 2) + "set 'rcpttos' instead", DeprecationWarning, 2) self.rcpttos = value @property def __data(self): warn("Access to __data attribute on SMTPChannel is deprecated, " - "use 'received_data' instead", DeprecationWarning, 2) + "use 'received_data' instead", DeprecationWarning, 2) return self.received_data @__data.setter def __data(self, value): warn("Setting __data attribute on SMTPChannel is deprecated, " - "set 'received_data' instead", DeprecationWarning, 2) + "set 'received_data' instead", DeprecationWarning, 2) self.received_data = value @property def __fqdn(self): warn("Access to __fqdn attribute on SMTPChannel is deprecated, " - "use 'fqdn' instead", DeprecationWarning, 2) + "use 'fqdn' instead", DeprecationWarning, 2) return self.fqdn @__fqdn.setter def __fqdn(self, value): warn("Setting __fqdn attribute on SMTPChannel is deprecated, " - "set 'fqdn' instead", DeprecationWarning, 2) + "set 'fqdn' instead", DeprecationWarning, 2) self.fqdn = value @property def __peer(self): warn("Access to __peer attribute on SMTPChannel is deprecated, " - "use 'peer' instead", DeprecationWarning, 2) + "use 'peer' instead", DeprecationWarning, 2) return self.peer @__peer.setter def __peer(self, value): warn("Setting __peer attribute on SMTPChannel is deprecated, " - "set 'peer' instead", DeprecationWarning, 2) + "set 'peer' instead", DeprecationWarning, 2) self.peer = value @property def __conn(self): warn("Access to __conn attribute on SMTPChannel is deprecated, " - "use 'conn' instead", DeprecationWarning, 2) + "use 'conn' instead", DeprecationWarning, 2) return self.conn @__conn.setter def __conn(self, value): warn("Setting __conn attribute on SMTPChannel is deprecated, " - "set 'conn' instead", DeprecationWarning, 2) + "set 'conn' instead", DeprecationWarning, 2) self.conn = value @property def __addr(self): warn("Access to __addr attribute on SMTPChannel is deprecated, " - "use 'addr' instead", DeprecationWarning, 2) + "use 'addr' instead", DeprecationWarning, 2) return self.addr @__addr.setter def __addr(self, value): warn("Setting __addr attribute on SMTPChannel is deprecated, " - "set 'addr' instead", DeprecationWarning, 2) + "set 'addr' instead", DeprecationWarning, 2) self.addr = value # Overrides base class for convenience. @@ -339,7 +339,7 @@ def found_terminator(self): command = line[:i].upper() arg = line[i+1:].strip() max_sz = (self.command_size_limits[command] - if self.extended_smtp else self.command_size_limit) + if self.extended_smtp else self.command_size_limit) if sz > max_sz: self.push('500 Error: line too long') return diff --git a/Lib/test/support/venv.py b/Lib/test/support/venv.py new file mode 100644 index 0000000000..78e6a51ec1 --- /dev/null +++ b/Lib/test/support/venv.py @@ -0,0 +1,70 @@ +import contextlib +import logging +import os +import subprocess +import shlex +import sys +import sysconfig +import tempfile +import venv + + +class VirtualEnvironment: + def __init__(self, prefix, **venv_create_args): + self._logger = logging.getLogger(self.__class__.__name__) + venv.create(prefix, **venv_create_args) + self._prefix = prefix + self._paths = sysconfig.get_paths( + scheme='venv', + vars={'base': self.prefix}, + expand=True, + ) + + @classmethod + @contextlib.contextmanager + def from_tmpdir(cls, *, prefix=None, dir=None, **venv_create_args): + delete = not bool(os.environ.get('PYTHON_TESTS_KEEP_VENV')) + with tempfile.TemporaryDirectory(prefix=prefix, dir=dir, delete=delete) as tmpdir: + yield cls(tmpdir, **venv_create_args) + + @property + def prefix(self): + return self._prefix + + @property + def paths(self): + return self._paths + + @property + def interpreter(self): + return os.path.join(self.paths['scripts'], os.path.basename(sys.executable)) + + def _format_output(self, name, data, indent='\t'): + if not data: + return indent + f'{name}: (none)' + if len(data.splitlines()) == 1: + return indent + f'{name}: {data}' + else: + prefixed_lines = '\n'.join(indent + '> ' + line for line in data.splitlines()) + return indent + f'{name}:\n' + prefixed_lines + + def run(self, *args, **subprocess_args): + if subprocess_args.get('shell'): + raise ValueError('Running the subprocess in shell mode is not supported.') + default_args = { + 'capture_output': True, + 'check': True, + } + try: + result = subprocess.run([self.interpreter, *args], **default_args | subprocess_args) + except subprocess.CalledProcessError as e: + if e.returncode != 0: + self._logger.error( + f'Interpreter returned non-zero exit status {e.returncode}.\n' + + self._format_output('COMMAND', shlex.join(e.cmd)) + '\n' + + self._format_output('STDOUT', e.stdout.decode()) + '\n' + + self._format_output('STDERR', e.stderr.decode()) + '\n' + ) + raise + else: + return result