Skip to content

Upgrade parts of test.support #5686

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions Lib/test/support/hypothesis_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,6 @@
except ImportError:
from . import _hypothesis_stubs as hypothesis
else:
# Regrtest changes to use a tempdir as the working directory, so we have
# to tell Hypothesis to use the original in order to persist the database.
from .os_helper import SAVEDCWD
from hypothesis.configuration import set_hypothesis_home_dir

set_hypothesis_home_dir(os.path.join(SAVEDCWD, ".hypothesis"))

# When using the real Hypothesis, we'll configure it to ignore occasional
# slow tests (avoiding flakiness from random VM slowness in CI).
hypothesis.settings.register_profile(
Expand Down
63 changes: 63 additions & 0 deletions Lib/test/support/i18n_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import re
import subprocess
import sys
import unittest
from pathlib import Path
from test.support import REPO_ROOT, TEST_HOME_DIR, requires_subprocess
from test.test_tools import skip_if_missing


pygettext = Path(REPO_ROOT) / 'Tools' / 'i18n' / 'pygettext.py'

msgid_pattern = re.compile(r'msgid(.*?)(?:msgid_plural|msgctxt|msgstr)',
re.DOTALL)
msgid_string_pattern = re.compile(r'"((?:\\"|[^"])*)"')


def _generate_po_file(path, *, stdout_only=True):
res = subprocess.run([sys.executable, pygettext,
'--no-location', '-o', '-', path],
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
text=True)
if stdout_only:
return res.stdout
return res


def _extract_msgids(po):
msgids = []
for msgid in msgid_pattern.findall(po):
msgid_string = ''.join(msgid_string_pattern.findall(msgid))
msgid_string = msgid_string.replace(r'\"', '"')
if msgid_string:
msgids.append(msgid_string)
return sorted(msgids)


def _get_snapshot_path(module_name):
return Path(TEST_HOME_DIR) / 'translationdata' / module_name / 'msgids.txt'


@requires_subprocess()
class TestTranslationsBase(unittest.TestCase):

def assertMsgidsEqual(self, module):
'''Assert that msgids extracted from a given module match a
snapshot.

'''
skip_if_missing('i18n')
res = _generate_po_file(module.__file__, stdout_only=False)
self.assertEqual(res.returncode, 0)
self.assertEqual(res.stderr, '')
msgids = _extract_msgids(res.stdout)
snapshot_path = _get_snapshot_path(module.__name__)
snapshot = snapshot_path.read_text().splitlines()
self.assertListEqual(msgids, snapshot)


def update_translation_snapshots(module):
contents = _generate_po_file(module.__file__)
msgids = _extract_msgids(contents)
snapshot_path = _get_snapshot_path(module.__name__)
snapshot_path.write_text('\n'.join(msgids))
80 changes: 80 additions & 0 deletions Lib/test/support/pty_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""
Helper to run a script in a pseudo-terminal.
"""
import os
import selectors
import subprocess
import sys
from contextlib import ExitStack
from errno import EIO

from test.support.import_helper import import_module

def run_pty(script, input=b"dummy input\r", env=None):
pty = import_module('pty')
output = bytearray()
[master, slave] = pty.openpty()
args = (sys.executable, '-c', script)
proc = subprocess.Popen(args, stdin=slave, stdout=slave, stderr=slave, env=env)
os.close(slave)
with ExitStack() as cleanup:
cleanup.enter_context(proc)
def terminate(proc):
try:
proc.terminate()
except ProcessLookupError:
# Workaround for Open/Net BSD bug (Issue 16762)
pass
cleanup.callback(terminate, proc)
cleanup.callback(os.close, master)
# Avoid using DefaultSelector and PollSelector. Kqueue() does not
# work with pseudo-terminals on OS X < 10.9 (Issue 20365) and Open
# BSD (Issue 20667). Poll() does not work with OS X 10.6 or 10.4
# either (Issue 20472). Hopefully the file descriptor is low enough
# to use with select().
sel = cleanup.enter_context(selectors.SelectSelector())
sel.register(master, selectors.EVENT_READ | selectors.EVENT_WRITE)
os.set_blocking(master, False)
while True:
for [_, events] in sel.select():
if events & selectors.EVENT_READ:
try:
chunk = os.read(master, 0x10000)
except OSError as err:
# Linux raises EIO when slave is closed (Issue 5380)
if err.errno != EIO:
raise
chunk = b""
if not chunk:
return output
output.extend(chunk)
if events & selectors.EVENT_WRITE:
try:
input = input[os.write(master, input):]
except OSError as err:
# Apparently EIO means the slave was closed
if err.errno != EIO:
raise
input = b"" # Stop writing
if not input:
sel.modify(master, selectors.EVENT_READ)


######################################################################
## Fake stdin (for testing interactive debugging)
######################################################################

class FakeInput:
"""
A fake input stream for pdb's interactive debugger. Whenever a
line is read, print it (to simulate the user typing it), and then
return it. The set of lines to return is specified in the
constructor; they should not have trailing newlines.
"""
def __init__(self, lines):
self.lines = lines

def readline(self):
line = self.lines.pop(0)
print(line)
return line + '\n'
46 changes: 23 additions & 23 deletions Lib/test/support/smtpd.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,122 +180,122 @@ def _set_rset_state(self):
@property
def __server(self):
warn("Access to __server attribute on SMTPChannel is deprecated, "
"use 'smtp_server' instead", DeprecationWarning, 2)
"use 'smtp_server' instead", DeprecationWarning, 2)
return self.smtp_server
@__server.setter
def __server(self, value):
warn("Setting __server attribute on SMTPChannel is deprecated, "
"set 'smtp_server' instead", DeprecationWarning, 2)
"set 'smtp_server' instead", DeprecationWarning, 2)
self.smtp_server = value

@property
def __line(self):
warn("Access to __line attribute on SMTPChannel is deprecated, "
"use 'received_lines' instead", DeprecationWarning, 2)
"use 'received_lines' instead", DeprecationWarning, 2)
return self.received_lines
@__line.setter
def __line(self, value):
warn("Setting __line attribute on SMTPChannel is deprecated, "
"set 'received_lines' instead", DeprecationWarning, 2)
"set 'received_lines' instead", DeprecationWarning, 2)
self.received_lines = value

@property
def __state(self):
warn("Access to __state attribute on SMTPChannel is deprecated, "
"use 'smtp_state' instead", DeprecationWarning, 2)
"use 'smtp_state' instead", DeprecationWarning, 2)
return self.smtp_state
@__state.setter
def __state(self, value):
warn("Setting __state attribute on SMTPChannel is deprecated, "
"set 'smtp_state' instead", DeprecationWarning, 2)
"set 'smtp_state' instead", DeprecationWarning, 2)
self.smtp_state = value

@property
def __greeting(self):
warn("Access to __greeting attribute on SMTPChannel is deprecated, "
"use 'seen_greeting' instead", DeprecationWarning, 2)
"use 'seen_greeting' instead", DeprecationWarning, 2)
return self.seen_greeting
@__greeting.setter
def __greeting(self, value):
warn("Setting __greeting attribute on SMTPChannel is deprecated, "
"set 'seen_greeting' instead", DeprecationWarning, 2)
"set 'seen_greeting' instead", DeprecationWarning, 2)
self.seen_greeting = value

@property
def __mailfrom(self):
warn("Access to __mailfrom attribute on SMTPChannel is deprecated, "
"use 'mailfrom' instead", DeprecationWarning, 2)
"use 'mailfrom' instead", DeprecationWarning, 2)
return self.mailfrom
@__mailfrom.setter
def __mailfrom(self, value):
warn("Setting __mailfrom attribute on SMTPChannel is deprecated, "
"set 'mailfrom' instead", DeprecationWarning, 2)
"set 'mailfrom' instead", DeprecationWarning, 2)
self.mailfrom = value

@property
def __rcpttos(self):
warn("Access to __rcpttos attribute on SMTPChannel is deprecated, "
"use 'rcpttos' instead", DeprecationWarning, 2)
"use 'rcpttos' instead", DeprecationWarning, 2)
return self.rcpttos
@__rcpttos.setter
def __rcpttos(self, value):
warn("Setting __rcpttos attribute on SMTPChannel is deprecated, "
"set 'rcpttos' instead", DeprecationWarning, 2)
"set 'rcpttos' instead", DeprecationWarning, 2)
self.rcpttos = value

@property
def __data(self):
warn("Access to __data attribute on SMTPChannel is deprecated, "
"use 'received_data' instead", DeprecationWarning, 2)
"use 'received_data' instead", DeprecationWarning, 2)
return self.received_data
@__data.setter
def __data(self, value):
warn("Setting __data attribute on SMTPChannel is deprecated, "
"set 'received_data' instead", DeprecationWarning, 2)
"set 'received_data' instead", DeprecationWarning, 2)
self.received_data = value

@property
def __fqdn(self):
warn("Access to __fqdn attribute on SMTPChannel is deprecated, "
"use 'fqdn' instead", DeprecationWarning, 2)
"use 'fqdn' instead", DeprecationWarning, 2)
return self.fqdn
@__fqdn.setter
def __fqdn(self, value):
warn("Setting __fqdn attribute on SMTPChannel is deprecated, "
"set 'fqdn' instead", DeprecationWarning, 2)
"set 'fqdn' instead", DeprecationWarning, 2)
self.fqdn = value

@property
def __peer(self):
warn("Access to __peer attribute on SMTPChannel is deprecated, "
"use 'peer' instead", DeprecationWarning, 2)
"use 'peer' instead", DeprecationWarning, 2)
return self.peer
@__peer.setter
def __peer(self, value):
warn("Setting __peer attribute on SMTPChannel is deprecated, "
"set 'peer' instead", DeprecationWarning, 2)
"set 'peer' instead", DeprecationWarning, 2)
self.peer = value

@property
def __conn(self):
warn("Access to __conn attribute on SMTPChannel is deprecated, "
"use 'conn' instead", DeprecationWarning, 2)
"use 'conn' instead", DeprecationWarning, 2)
return self.conn
@__conn.setter
def __conn(self, value):
warn("Setting __conn attribute on SMTPChannel is deprecated, "
"set 'conn' instead", DeprecationWarning, 2)
"set 'conn' instead", DeprecationWarning, 2)
self.conn = value

@property
def __addr(self):
warn("Access to __addr attribute on SMTPChannel is deprecated, "
"use 'addr' instead", DeprecationWarning, 2)
"use 'addr' instead", DeprecationWarning, 2)
return self.addr
@__addr.setter
def __addr(self, value):
warn("Setting __addr attribute on SMTPChannel is deprecated, "
"set 'addr' instead", DeprecationWarning, 2)
"set 'addr' instead", DeprecationWarning, 2)
self.addr = value

# Overrides base class for convenience.
Expand Down Expand Up @@ -339,7 +339,7 @@ def found_terminator(self):
command = line[:i].upper()
arg = line[i+1:].strip()
max_sz = (self.command_size_limits[command]
if self.extended_smtp else self.command_size_limit)
if self.extended_smtp else self.command_size_limit)
if sz > max_sz:
self.push('500 Error: line too long')
return
Expand Down
70 changes: 70 additions & 0 deletions Lib/test/support/venv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import contextlib
import logging
import os
import subprocess
import shlex
import sys
import sysconfig
import tempfile
import venv


class VirtualEnvironment:
def __init__(self, prefix, **venv_create_args):
self._logger = logging.getLogger(self.__class__.__name__)
venv.create(prefix, **venv_create_args)
self._prefix = prefix
self._paths = sysconfig.get_paths(
scheme='venv',
vars={'base': self.prefix},
expand=True,
)

@classmethod
@contextlib.contextmanager
def from_tmpdir(cls, *, prefix=None, dir=None, **venv_create_args):
delete = not bool(os.environ.get('PYTHON_TESTS_KEEP_VENV'))
with tempfile.TemporaryDirectory(prefix=prefix, dir=dir, delete=delete) as tmpdir:
yield cls(tmpdir, **venv_create_args)

@property
def prefix(self):
return self._prefix

@property
def paths(self):
return self._paths

@property
def interpreter(self):
return os.path.join(self.paths['scripts'], os.path.basename(sys.executable))

def _format_output(self, name, data, indent='\t'):
if not data:
return indent + f'{name}: (none)'
if len(data.splitlines()) == 1:
return indent + f'{name}: {data}'
else:
prefixed_lines = '\n'.join(indent + '> ' + line for line in data.splitlines())
return indent + f'{name}:\n' + prefixed_lines

def run(self, *args, **subprocess_args):
if subprocess_args.get('shell'):
raise ValueError('Running the subprocess in shell mode is not supported.')
default_args = {
'capture_output': True,
'check': True,
}
try:
result = subprocess.run([self.interpreter, *args], **default_args | subprocess_args)
except subprocess.CalledProcessError as e:
if e.returncode != 0:
self._logger.error(
f'Interpreter returned non-zero exit status {e.returncode}.\n'
+ self._format_output('COMMAND', shlex.join(e.cmd)) + '\n'
+ self._format_output('STDOUT', e.stdout.decode()) + '\n'
+ self._format_output('STDERR', e.stderr.decode()) + '\n'
)
raise
else:
return result
Loading