From 15fc64dc1e379559da1991d98cc23e33d945b14f Mon Sep 17 00:00:00 2001 From: Aviv Palivoda Date: Tue, 12 Nov 2019 16:01:48 +0200 Subject: [PATCH 1/2] Copy test_bool.py and test.support from CPython --- Lib/test/support/__init__.py | 3036 +++++++++++++++++++++++++++++ Lib/test/support/script_helper.py | 263 +++ Lib/test/support/testresult.py | 204 ++ Lib/test/test_bool.py | 369 ++++ 4 files changed, 3872 insertions(+) create mode 100644 Lib/test/support/__init__.py create mode 100644 Lib/test/support/script_helper.py create mode 100644 Lib/test/support/testresult.py create mode 100644 Lib/test/test_bool.py diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py new file mode 100644 index 0000000000..9e60d960ab --- /dev/null +++ b/Lib/test/support/__init__.py @@ -0,0 +1,3036 @@ +"""Supporting definitions for the Python regression tests.""" + +if __name__ != 'test.support': + raise ImportError('support must be imported from the test package') + +import asyncio.events +import collections.abc +import contextlib +import datetime +import errno +import faulthandler +import fnmatch +import functools +import gc +import importlib +import importlib.util +import io +import logging.handlers +import nntplib +import os +import platform +import re +import shutil +import socket +import stat +import struct +import subprocess +import sys +import sysconfig +import tempfile +import _thread +import threading +import time +import types +import unittest +import urllib.error +import warnings + +from .testresult import get_test_runner + +try: + import multiprocessing.process +except ImportError: + multiprocessing = None + +try: + import zlib +except ImportError: + zlib = None + +try: + import gzip +except ImportError: + gzip = None + +try: + import bz2 +except ImportError: + bz2 = None + +try: + import lzma +except ImportError: + lzma = None + +try: + import resource +except ImportError: + resource = None + +__all__ = [ + # globals + "PIPE_MAX_SIZE", "verbose", "max_memuse", "use_resources", "failfast", + # exceptions + "Error", "TestFailed", "TestDidNotRun", "ResourceDenied", + # imports + "import_module", "import_fresh_module", "CleanImport", + # modules + "unload", "forget", + # io + "record_original_stdout", "get_original_stdout", "captured_stdout", + "captured_stdin", "captured_stderr", + # filesystem + "TESTFN", "SAVEDCWD", "unlink", "rmtree", "temp_cwd", "findfile", + "create_empty_file", "can_symlink", "fs_is_case_insensitive", + # unittest + "is_resource_enabled", "requires", "requires_freebsd_version", + "requires_linux_version", "requires_mac_ver", "check_syntax_error", + "check_syntax_warning", + "TransientResource", "time_out", "socket_peer_reset", "ioerror_peer_reset", + "transient_internet", "BasicTestRunner", "run_unittest", "run_doctest", + "skip_unless_symlink", "requires_gzip", "requires_bz2", "requires_lzma", + "bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute", + "requires_IEEE_754", "skip_unless_xattr", "requires_zlib", + "anticipate_failure", "load_package_tests", "detect_api_mismatch", + "check__all__", "skip_unless_bind_unix_socket", + "ignore_warnings", + # sys + "is_jython", "is_android", "check_impl_detail", "unix_shell", + "setswitchinterval", + # network + "HOST", "IPV6_ENABLED", "find_unused_port", "bind_port", "open_urlresource", + "bind_unix_socket", + # processes + 'temp_umask', "reap_children", + # logging + "TestHandler", + # threads + "threading_setup", "threading_cleanup", "reap_threads", "start_threads", + # miscellaneous + "check_warnings", "check_no_resource_warning", "check_no_warnings", + "EnvironmentVarGuard", + "run_with_locale", "swap_item", + "swap_attr", "Matcher", "set_memlimit", "SuppressCrashReport", "sortdict", + "run_with_tz", "PGO", "missing_compiler_executable", "fd_count", + ] + +class Error(Exception): + """Base class for regression test exceptions.""" + +class TestFailed(Error): + """Test failed.""" + +class TestDidNotRun(Error): + """Test did not run any subtests.""" + +class ResourceDenied(unittest.SkipTest): + """Test skipped because it requested a disallowed resource. + + This is raised when a test calls requires() for a resource that + has not be enabled. It is used to distinguish between expected + and unexpected skips. + """ + +@contextlib.contextmanager +def _ignore_deprecated_imports(ignore=True): + """Context manager to suppress package and module deprecation + warnings when importing them. + + If ignore is False, this context manager has no effect. + """ + if ignore: + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", ".+ (module|package)", + DeprecationWarning) + yield + else: + yield + + +def ignore_warnings(*, category): + """Decorator to suppress deprecation warnings. + + Use of context managers to hide warnings make diffs + more noisy and tools like 'git blame' less useful. + """ + def decorator(test): + @functools.wraps(test) + def wrapper(self, *args, **kwargs): + with warnings.catch_warnings(): + warnings.simplefilter('ignore', category=category) + return test(self, *args, **kwargs) + return wrapper + return decorator + + +def import_module(name, deprecated=False, *, required_on=()): + """Import and return the module to be tested, raising SkipTest if + it is not available. + + If deprecated is True, any module or package deprecation messages + will be suppressed. If a module is required on a platform but optional for + others, set required_on to an iterable of platform prefixes which will be + compared against sys.platform. + """ + with _ignore_deprecated_imports(deprecated): + try: + return importlib.import_module(name) + except ImportError as msg: + if sys.platform.startswith(tuple(required_on)): + raise + raise unittest.SkipTest(str(msg)) + + +def _save_and_remove_module(name, orig_modules): + """Helper function to save and remove a module from sys.modules + + Raise ImportError if the module can't be imported. + """ + # try to import the module and raise an error if it can't be imported + if name not in sys.modules: + __import__(name) + del sys.modules[name] + for modname in list(sys.modules): + if modname == name or modname.startswith(name + '.'): + orig_modules[modname] = sys.modules[modname] + del sys.modules[modname] + +def _save_and_block_module(name, orig_modules): + """Helper function to save and block a module in sys.modules + + Return True if the module was in sys.modules, False otherwise. + """ + saved = True + try: + orig_modules[name] = sys.modules[name] + except KeyError: + saved = False + sys.modules[name] = None + return saved + + +def anticipate_failure(condition): + """Decorator to mark a test that is known to be broken in some cases + + Any use of this decorator should have a comment identifying the + associated tracker issue. + """ + if condition: + return unittest.expectedFailure + return lambda f: f + +def load_package_tests(pkg_dir, loader, standard_tests, pattern): + """Generic load_tests implementation for simple test packages. + + Most packages can implement load_tests using this function as follows: + + def load_tests(*args): + return load_package_tests(os.path.dirname(__file__), *args) + """ + if pattern is None: + pattern = "test*" + top_dir = os.path.dirname( # Lib + os.path.dirname( # test + os.path.dirname(__file__))) # support + package_tests = loader.discover(start_dir=pkg_dir, + top_level_dir=top_dir, + pattern=pattern) + standard_tests.addTests(package_tests) + return standard_tests + + +def import_fresh_module(name, fresh=(), blocked=(), deprecated=False): + """Import and return a module, deliberately bypassing sys.modules. + + This function imports and returns a fresh copy of the named Python module + by removing the named module from sys.modules before doing the import. + Note that unlike reload, the original module is not affected by + this operation. + + *fresh* is an iterable of additional module names that are also removed + from the sys.modules cache before doing the import. + + *blocked* is an iterable of module names that are replaced with None + in the module cache during the import to ensure that attempts to import + them raise ImportError. + + The named module and any modules named in the *fresh* and *blocked* + parameters are saved before starting the import and then reinserted into + sys.modules when the fresh import is complete. + + Module and package deprecation messages are suppressed during this import + if *deprecated* is True. + + This function will raise ImportError if the named module cannot be + imported. + """ + # NOTE: test_heapq, test_json and test_warnings include extra sanity checks + # to make sure that this utility function is working as expected + with _ignore_deprecated_imports(deprecated): + # Keep track of modules saved for later restoration as well + # as those which just need a blocking entry removed + orig_modules = {} + names_to_remove = [] + _save_and_remove_module(name, orig_modules) + try: + for fresh_name in fresh: + _save_and_remove_module(fresh_name, orig_modules) + for blocked_name in blocked: + if not _save_and_block_module(blocked_name, orig_modules): + names_to_remove.append(blocked_name) + fresh_module = importlib.import_module(name) + except ImportError: + fresh_module = None + finally: + for orig_name, module in orig_modules.items(): + sys.modules[orig_name] = module + for name_to_remove in names_to_remove: + del sys.modules[name_to_remove] + return fresh_module + + +def get_attribute(obj, name): + """Get an attribute, raising SkipTest if AttributeError is raised.""" + try: + attribute = getattr(obj, name) + except AttributeError: + raise unittest.SkipTest("object %r has no attribute %r" % (obj, name)) + else: + return attribute + +verbose = 1 # Flag set to 0 by regrtest.py +use_resources = None # Flag set to [] by regrtest.py +max_memuse = 0 # Disable bigmem tests (they will still be run with + # small sizes, to make sure they work.) +real_max_memuse = 0 +junit_xml_list = None # list of testsuite XML elements +failfast = False + +# _original_stdout is meant to hold stdout at the time regrtest began. +# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever. +# The point is to have some flavor of stdout the user can actually see. +_original_stdout = None +def record_original_stdout(stdout): + global _original_stdout + _original_stdout = stdout + +def get_original_stdout(): + return _original_stdout or sys.stdout + +def unload(name): + try: + del sys.modules[name] + except KeyError: + pass + +def _force_run(path, func, *args): + try: + return func(*args) + except OSError as err: + if verbose >= 2: + print('%s: %s' % (err.__class__.__name__, err)) + print('re-run %s%r' % (func.__name__, args)) + os.chmod(path, stat.S_IRWXU) + return func(*args) + +if sys.platform.startswith("win"): + def _waitfor(func, pathname, waitall=False): + # Perform the operation + func(pathname) + # Now setup the wait loop + if waitall: + dirname = pathname + else: + dirname, name = os.path.split(pathname) + dirname = dirname or '.' + # Check for `pathname` to be removed from the filesystem. + # The exponential backoff of the timeout amounts to a total + # of ~1 second after which the deletion is probably an error + # anyway. + # Testing on an i7@4.3GHz shows that usually only 1 iteration is + # required when contention occurs. + timeout = 0.001 + while timeout < 1.0: + # Note we are only testing for the existence of the file(s) in + # the contents of the directory regardless of any security or + # access rights. If we have made it this far, we have sufficient + # permissions to do that much using Python's equivalent of the + # Windows API FindFirstFile. + # Other Windows APIs can fail or give incorrect results when + # dealing with files that are pending deletion. + L = os.listdir(dirname) + if not (L if waitall else name in L): + return + # Increase the timeout and try again + time.sleep(timeout) + timeout *= 2 + warnings.warn('tests may fail, delete still pending for ' + pathname, + RuntimeWarning, stacklevel=4) + + def _unlink(filename): + _waitfor(os.unlink, filename) + + def _rmdir(dirname): + _waitfor(os.rmdir, dirname) + + def _rmtree(path): + def _rmtree_inner(path): + for name in _force_run(path, os.listdir, path): + fullname = os.path.join(path, name) + try: + mode = os.lstat(fullname).st_mode + except OSError as exc: + print("support.rmtree(): os.lstat(%r) failed with %s" % (fullname, exc), + file=sys.__stderr__) + mode = 0 + if stat.S_ISDIR(mode): + _waitfor(_rmtree_inner, fullname, waitall=True) + _force_run(fullname, os.rmdir, fullname) + else: + _force_run(fullname, os.unlink, fullname) + _waitfor(_rmtree_inner, path, waitall=True) + _waitfor(lambda p: _force_run(p, os.rmdir, p), path) + + def _longpath(path): + try: + import ctypes + except ImportError: + # No ctypes means we can't expands paths. + pass + else: + buffer = ctypes.create_unicode_buffer(len(path) * 2) + length = ctypes.windll.kernel32.GetLongPathNameW(path, buffer, + len(buffer)) + if length: + return buffer[:length] + return path +else: + _unlink = os.unlink + _rmdir = os.rmdir + + def _rmtree(path): + try: + shutil.rmtree(path) + return + except OSError: + pass + + def _rmtree_inner(path): + for name in _force_run(path, os.listdir, path): + fullname = os.path.join(path, name) + try: + mode = os.lstat(fullname).st_mode + except OSError: + mode = 0 + if stat.S_ISDIR(mode): + _rmtree_inner(fullname) + _force_run(path, os.rmdir, fullname) + else: + _force_run(path, os.unlink, fullname) + _rmtree_inner(path) + os.rmdir(path) + + def _longpath(path): + return path + +def unlink(filename): + try: + _unlink(filename) + except (FileNotFoundError, NotADirectoryError): + pass + +def rmdir(dirname): + try: + _rmdir(dirname) + except FileNotFoundError: + pass + +def rmtree(path): + try: + _rmtree(path) + except FileNotFoundError: + pass + +def make_legacy_pyc(source): + """Move a PEP 3147/488 pyc file to its legacy pyc location. + + :param source: The file system path to the source file. The source file + does not need to exist, however the PEP 3147/488 pyc file must exist. + :return: The file system path to the legacy pyc file. + """ + pyc_file = importlib.util.cache_from_source(source) + up_one = os.path.dirname(os.path.abspath(source)) + legacy_pyc = os.path.join(up_one, source + 'c') + os.rename(pyc_file, legacy_pyc) + return legacy_pyc + +def forget(modname): + """'Forget' a module was ever imported. + + This removes the module from sys.modules and deletes any PEP 3147/488 or + legacy .pyc files. + """ + unload(modname) + for dirname in sys.path: + source = os.path.join(dirname, modname + '.py') + # It doesn't matter if they exist or not, unlink all possible + # combinations of PEP 3147/488 and legacy pyc files. + unlink(source + 'c') + for opt in ('', 1, 2): + unlink(importlib.util.cache_from_source(source, optimization=opt)) + +# Check whether a gui is actually available +def _is_gui_available(): + if hasattr(_is_gui_available, 'result'): + return _is_gui_available.result + reason = None + if sys.platform.startswith('win'): + # if Python is running as a service (such as the buildbot service), + # gui interaction may be disallowed + import ctypes + import ctypes.wintypes + UOI_FLAGS = 1 + WSF_VISIBLE = 0x0001 + class USEROBJECTFLAGS(ctypes.Structure): + _fields_ = [("fInherit", ctypes.wintypes.BOOL), + ("fReserved", ctypes.wintypes.BOOL), + ("dwFlags", ctypes.wintypes.DWORD)] + dll = ctypes.windll.user32 + h = dll.GetProcessWindowStation() + if not h: + raise ctypes.WinError() + uof = USEROBJECTFLAGS() + needed = ctypes.wintypes.DWORD() + res = dll.GetUserObjectInformationW(h, + UOI_FLAGS, + ctypes.byref(uof), + ctypes.sizeof(uof), + ctypes.byref(needed)) + if not res: + raise ctypes.WinError() + if not bool(uof.dwFlags & WSF_VISIBLE): + reason = "gui not available (WSF_VISIBLE flag not set)" + elif sys.platform == 'darwin': + # The Aqua Tk implementations on OS X can abort the process if + # being called in an environment where a window server connection + # cannot be made, for instance when invoked by a buildbot or ssh + # process not running under the same user id as the current console + # user. To avoid that, raise an exception if the window manager + # connection is not available. + from ctypes import cdll, c_int, pointer, Structure + from ctypes.util import find_library + + app_services = cdll.LoadLibrary(find_library("ApplicationServices")) + + if app_services.CGMainDisplayID() == 0: + reason = "gui tests cannot run without OS X window manager" + else: + class ProcessSerialNumber(Structure): + _fields_ = [("highLongOfPSN", c_int), + ("lowLongOfPSN", c_int)] + psn = ProcessSerialNumber() + psn_p = pointer(psn) + if ( (app_services.GetCurrentProcess(psn_p) < 0) or + (app_services.SetFrontProcess(psn_p) < 0) ): + reason = "cannot run without OS X gui process" + + # check on every platform whether tkinter can actually do anything + if not reason: + try: + from tkinter import Tk + root = Tk() + root.withdraw() + root.update() + root.destroy() + except Exception as e: + err_string = str(e) + if len(err_string) > 50: + err_string = err_string[:50] + ' [...]' + reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__, + err_string) + + _is_gui_available.reason = reason + _is_gui_available.result = not reason + + return _is_gui_available.result + +def is_resource_enabled(resource): + """Test whether a resource is enabled. + + Known resources are set by regrtest.py. If not running under regrtest.py, + all resources are assumed enabled unless use_resources has been set. + """ + return use_resources is None or resource in use_resources + +def requires(resource, msg=None): + """Raise ResourceDenied if the specified resource is not available.""" + if not is_resource_enabled(resource): + if msg is None: + msg = "Use of the %r resource not enabled" % resource + raise ResourceDenied(msg) + if resource == 'gui' and not _is_gui_available(): + raise ResourceDenied(_is_gui_available.reason) + +def _requires_unix_version(sysname, min_version): + """Decorator raising SkipTest if the OS is `sysname` and the version is less + than `min_version`. + + For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if + the FreeBSD version is less than 7.2. + """ + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kw): + if platform.system() == sysname: + version_txt = platform.release().split('-', 1)[0] + try: + version = tuple(map(int, version_txt.split('.'))) + except ValueError: + pass + else: + if version < min_version: + min_version_txt = '.'.join(map(str, min_version)) + raise unittest.SkipTest( + "%s version %s or higher required, not %s" + % (sysname, min_version_txt, version_txt)) + return func(*args, **kw) + wrapper.min_version = min_version + return wrapper + return decorator + +def requires_freebsd_version(*min_version): + """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is + less than `min_version`. + + For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD + version is less than 7.2. + """ + return _requires_unix_version('FreeBSD', min_version) + +def requires_linux_version(*min_version): + """Decorator raising SkipTest if the OS is Linux and the Linux version is + less than `min_version`. + + For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux + version is less than 2.6.32. + """ + return _requires_unix_version('Linux', min_version) + +def requires_mac_ver(*min_version): + """Decorator raising SkipTest if the OS is Mac OS X and the OS X + version if less than min_version. + + For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version + is lesser than 10.5. + """ + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kw): + if sys.platform == 'darwin': + version_txt = platform.mac_ver()[0] + try: + version = tuple(map(int, version_txt.split('.'))) + except ValueError: + pass + else: + if version < min_version: + min_version_txt = '.'.join(map(str, min_version)) + raise unittest.SkipTest( + "Mac OS X %s or higher required, not %s" + % (min_version_txt, version_txt)) + return func(*args, **kw) + wrapper.min_version = min_version + return wrapper + return decorator + + +HOST = "localhost" +HOSTv4 = "127.0.0.1" +HOSTv6 = "::1" + + +def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM): + """Returns an unused port that should be suitable for binding. This is + achieved by creating a temporary socket with the same family and type as + the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to + the specified host address (defaults to 0.0.0.0) with the port set to 0, + eliciting an unused ephemeral port from the OS. The temporary socket is + then closed and deleted, and the ephemeral port is returned. + + Either this method or bind_port() should be used for any tests where a + server socket needs to be bound to a particular port for the duration of + the test. Which one to use depends on whether the calling code is creating + a python socket, or if an unused port needs to be provided in a constructor + or passed to an external program (i.e. the -accept argument to openssl's + s_server mode). Always prefer bind_port() over find_unused_port() where + possible. Hard coded ports should *NEVER* be used. As soon as a server + socket is bound to a hard coded port, the ability to run multiple instances + of the test simultaneously on the same host is compromised, which makes the + test a ticking time bomb in a buildbot environment. On Unix buildbots, this + may simply manifest as a failed test, which can be recovered from without + intervention in most cases, but on Windows, the entire python process can + completely and utterly wedge, requiring someone to log in to the buildbot + and manually kill the affected process. + + (This is easy to reproduce on Windows, unfortunately, and can be traced to + the SO_REUSEADDR socket option having different semantics on Windows versus + Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind, + listen and then accept connections on identical host/ports. An EADDRINUSE + OSError will be raised at some point (depending on the platform and + the order bind and listen were called on each socket). + + However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE + will ever be raised when attempting to bind two identical host/ports. When + accept() is called on each socket, the second caller's process will steal + the port from the first caller, leaving them both in an awkwardly wedged + state where they'll no longer respond to any signals or graceful kills, and + must be forcibly killed via OpenProcess()/TerminateProcess(). + + The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option + instead of SO_REUSEADDR, which effectively affords the same semantics as + SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open + Source world compared to Windows ones, this is a common mistake. A quick + look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when + openssl.exe is called with the 's_server' option, for example. See + http://bugs.python.org/issue2550 for more info. The following site also + has a very thorough description about the implications of both REUSEADDR + and EXCLUSIVEADDRUSE on Windows: + http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx) + + XXX: although this approach is a vast improvement on previous attempts to + elicit unused ports, it rests heavily on the assumption that the ephemeral + port returned to us by the OS won't immediately be dished back out to some + other process when we close and delete our temporary socket but before our + calling code has a chance to bind the returned port. We can deal with this + issue if/when we come across it. + """ + + with socket.socket(family, socktype) as tempsock: + port = bind_port(tempsock) + del tempsock + return port + +def bind_port(sock, host=HOST): + """Bind the socket to a free port and return the port number. Relies on + ephemeral ports in order to ensure we are using an unbound port. This is + important as many tests may be running simultaneously, especially in a + buildbot environment. This method raises an exception if the sock.family + is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR + or SO_REUSEPORT set on it. Tests should *never* set these socket options + for TCP/IP sockets. The only case for setting these options is testing + multicasting via multiple UDP sockets. + + Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e. + on Windows), it will be set on the socket. This will prevent anyone else + from bind()'ing to our host/port for the duration of the test. + """ + + if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM: + if hasattr(socket, 'SO_REUSEADDR'): + if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1: + raise TestFailed("tests should never set the SO_REUSEADDR " \ + "socket option on TCP/IP sockets!") + if hasattr(socket, 'SO_REUSEPORT'): + try: + if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1: + raise TestFailed("tests should never set the SO_REUSEPORT " \ + "socket option on TCP/IP sockets!") + except OSError: + # Python's socket module was compiled using modern headers + # thus defining SO_REUSEPORT but this process is running + # under an older kernel that does not support SO_REUSEPORT. + pass + if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'): + sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1) + + sock.bind((host, 0)) + port = sock.getsockname()[1] + return port + +def bind_unix_socket(sock, addr): + """Bind a unix socket, raising SkipTest if PermissionError is raised.""" + assert sock.family == socket.AF_UNIX + try: + sock.bind(addr) + except PermissionError: + sock.close() + raise unittest.SkipTest('cannot bind AF_UNIX sockets') + +def _is_ipv6_enabled(): + """Check whether IPv6 is enabled on this host.""" + if socket.has_ipv6: + sock = None + try: + sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) + sock.bind((HOSTv6, 0)) + return True + except OSError: + pass + finally: + if sock: + sock.close() + return False + +IPV6_ENABLED = _is_ipv6_enabled() + +def system_must_validate_cert(f): + """Skip the test on TLS certificate validation failures.""" + @functools.wraps(f) + def dec(*args, **kwargs): + try: + f(*args, **kwargs) + except OSError as e: + if "CERTIFICATE_VERIFY_FAILED" in str(e): + raise unittest.SkipTest("system does not contain " + "necessary certificates") + raise + return dec + +# A constant likely larger than the underlying OS pipe buffer size, to +# make writes blocking. +# Windows limit seems to be around 512 B, and many Unix kernels have a +# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure. +# (see issue #17835 for a discussion of this number). +PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1 + +# A constant likely larger than the underlying OS socket buffer size, to make +# writes blocking. +# The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl +# on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643 +# for a discussion of this number). +SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1 + +# decorator for skipping tests on non-IEEE 754 platforms +requires_IEEE_754 = unittest.skipUnless( + float.__getformat__("double").startswith("IEEE"), + "test requires IEEE 754 doubles") + +requires_zlib = unittest.skipUnless(zlib, 'requires zlib') + +requires_gzip = unittest.skipUnless(gzip, 'requires gzip') + +requires_bz2 = unittest.skipUnless(bz2, 'requires bz2') + +requires_lzma = unittest.skipUnless(lzma, 'requires lzma') + +is_jython = sys.platform.startswith('java') + +is_android = hasattr(sys, 'getandroidapilevel') + +if sys.platform != 'win32': + unix_shell = '/system/bin/sh' if is_android else '/bin/sh' +else: + unix_shell = None + +# Filename used for testing +if os.name == 'java': + # Jython disallows @ in module names + TESTFN = '$test' +else: + TESTFN = '@test' + +# Disambiguate TESTFN for parallel testing, while letting it remain a valid +# module name. +TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid()) + +# Define the URL of a dedicated HTTP server for the network tests. +# The URL must use clear-text HTTP: no redirection to encrypted HTTPS. +TEST_HTTP_URL = "http://www.pythontest.net" + +# FS_NONASCII: non-ASCII character encodable by os.fsencode(), +# or None if there is no such character. +FS_NONASCII = None +for character in ( + # First try printable and common characters to have a readable filename. + # For each character, the encoding list are just example of encodings able + # to encode the character (the list is not exhaustive). + + # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1 + '\u00E6', + # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3 + '\u0130', + # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257 + '\u0141', + # U+03C6 (Greek Small Letter Phi): cp1253 + '\u03C6', + # U+041A (Cyrillic Capital Letter Ka): cp1251 + '\u041A', + # U+05D0 (Hebrew Letter Alef): Encodable to cp424 + '\u05D0', + # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic + '\u060C', + # U+062A (Arabic Letter Teh): cp720 + '\u062A', + # U+0E01 (Thai Character Ko Kai): cp874 + '\u0E01', + + # Then try more "special" characters. "special" because they may be + # interpreted or displayed differently depending on the exact locale + # encoding and the font. + + # U+00A0 (No-Break Space) + '\u00A0', + # U+20AC (Euro Sign) + '\u20AC', +): + try: + # If Python is set up to use the legacy 'mbcs' in Windows, + # 'replace' error mode is used, and encode() returns b'?' + # for characters missing in the ANSI codepage + if os.fsdecode(os.fsencode(character)) != character: + raise UnicodeError + except UnicodeError: + pass + else: + FS_NONASCII = character + break + +# TESTFN_UNICODE is a non-ascii filename +TESTFN_UNICODE = TESTFN + "-\xe0\xf2\u0258\u0141\u011f" +if sys.platform == 'darwin': + # In Mac OS X's VFS API file names are, by definition, canonically + # decomposed Unicode, encoded using UTF-8. See QA1173: + # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html + import unicodedata + TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE) +TESTFN_ENCODING = sys.getfilesystemencoding() + +# TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be +# encoded by the filesystem encoding (in strict mode). It can be None if we +# cannot generate such filename. +TESTFN_UNENCODABLE = None +if os.name == 'nt': + # skip win32s (0) or Windows 9x/ME (1) + if sys.getwindowsversion().platform >= 2: + # Different kinds of characters from various languages to minimize the + # probability that the whole name is encodable to MBCS (issue #9819) + TESTFN_UNENCODABLE = TESTFN + "-\u5171\u0141\u2661\u0363\uDC80" + try: + TESTFN_UNENCODABLE.encode(TESTFN_ENCODING) + except UnicodeEncodeError: + pass + else: + print('WARNING: The filename %r CAN be encoded by the filesystem encoding (%s). ' + 'Unicode filename tests may not be effective' + % (TESTFN_UNENCODABLE, TESTFN_ENCODING)) + TESTFN_UNENCODABLE = None +# Mac OS X denies unencodable filenames (invalid utf-8) +elif sys.platform != 'darwin': + try: + # ascii and utf-8 cannot encode the byte 0xff + b'\xff'.decode(TESTFN_ENCODING) + except UnicodeDecodeError: + # 0xff will be encoded using the surrogate character u+DCFF + TESTFN_UNENCODABLE = TESTFN \ + + b'-\xff'.decode(TESTFN_ENCODING, 'surrogateescape') + else: + # File system encoding (eg. ISO-8859-* encodings) can encode + # the byte 0xff. Skip some unicode filename tests. + pass + +# TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be +# decoded from the filesystem encoding (in strict mode). It can be None if we +# cannot generate such filename (ex: the latin1 encoding can decode any byte +# sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks +# to the surrogateescape error handler (PEP 383), but not from the filesystem +# encoding in strict mode. +TESTFN_UNDECODABLE = None +for name in ( + # b'\xff' is not decodable by os.fsdecode() with code page 932. Windows + # accepts it to create a file or a directory, or don't accept to enter to + # such directory (when the bytes name is used). So test b'\xe7' first: it is + # not decodable from cp932. + b'\xe7w\xf0', + # undecodable from ASCII, UTF-8 + b'\xff', + # undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856 + # and cp857 + b'\xae\xd5' + # undecodable from UTF-8 (UNIX and Mac OS X) + b'\xed\xb2\x80', b'\xed\xb4\x80', + # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252, + # cp1253, cp1254, cp1255, cp1257, cp1258 + b'\x81\x98', +): + try: + name.decode(TESTFN_ENCODING) + except UnicodeDecodeError: + TESTFN_UNDECODABLE = os.fsencode(TESTFN) + name + break + +if FS_NONASCII: + TESTFN_NONASCII = TESTFN + '-' + FS_NONASCII +else: + TESTFN_NONASCII = None + +# Save the initial cwd +SAVEDCWD = os.getcwd() + +# Set by libregrtest/main.py so we can skip tests that are not +# useful for PGO +PGO = False + +@contextlib.contextmanager +def temp_dir(path=None, quiet=False): + """Return a context manager that creates a temporary directory. + + Arguments: + + path: the directory to create temporarily. If omitted or None, + defaults to creating a temporary directory using tempfile.mkdtemp. + + quiet: if False (the default), the context manager raises an exception + on error. Otherwise, if the path is specified and cannot be + created, only a warning is issued. + + """ + dir_created = False + if path is None: + path = tempfile.mkdtemp() + dir_created = True + path = os.path.realpath(path) + else: + try: + os.mkdir(path) + dir_created = True + except OSError as exc: + if not quiet: + raise + warnings.warn(f'tests may fail, unable to create ' + f'temporary directory {path!r}: {exc}', + RuntimeWarning, stacklevel=3) + if dir_created: + pid = os.getpid() + try: + yield path + finally: + # In case the process forks, let only the parent remove the + # directory. The child has a different process id. (bpo-30028) + if dir_created and pid == os.getpid(): + rmtree(path) + +@contextlib.contextmanager +def change_cwd(path, quiet=False): + """Return a context manager that changes the current working directory. + + Arguments: + + path: the directory to use as the temporary current working directory. + + quiet: if False (the default), the context manager raises an exception + on error. Otherwise, it issues only a warning and keeps the current + working directory the same. + + """ + saved_dir = os.getcwd() + try: + os.chdir(path) + except OSError as exc: + if not quiet: + raise + warnings.warn(f'tests may fail, unable to change the current working ' + f'directory to {path!r}: {exc}', + RuntimeWarning, stacklevel=3) + try: + yield os.getcwd() + finally: + os.chdir(saved_dir) + + +@contextlib.contextmanager +def temp_cwd(name='tempcwd', quiet=False): + """ + Context manager that temporarily creates and changes the CWD. + + The function temporarily changes the current working directory + after creating a temporary directory in the current directory with + name *name*. If *name* is None, the temporary directory is + created using tempfile.mkdtemp. + + If *quiet* is False (default) and it is not possible to + create or change the CWD, an error is raised. If *quiet* is True, + only a warning is raised and the original CWD is used. + + """ + with temp_dir(path=name, quiet=quiet) as temp_path: + with change_cwd(temp_path, quiet=quiet) as cwd_dir: + yield cwd_dir + +if hasattr(os, "umask"): + @contextlib.contextmanager + def temp_umask(umask): + """Context manager that temporarily sets the process umask.""" + oldmask = os.umask(umask) + try: + yield + finally: + os.umask(oldmask) + +# TEST_HOME_DIR refers to the top level directory of the "test" package +# that contains Python's regression test suite +TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__)) +TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR) + +# TEST_DATA_DIR is used as a target download location for remote resources +TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data") + +def findfile(filename, subdir=None): + """Try to find a file on sys.path or in the test directory. If it is not + found the argument passed to the function is returned (this does not + necessarily signal failure; could still be the legitimate path). + + Setting *subdir* indicates a relative path to use to find the file + rather than looking directly in the path directories. + """ + if os.path.isabs(filename): + return filename + if subdir is not None: + filename = os.path.join(subdir, filename) + path = [TEST_HOME_DIR] + sys.path + for dn in path: + fn = os.path.join(dn, filename) + if os.path.exists(fn): return fn + return filename + +def create_empty_file(filename): + """Create an empty file. If the file already exists, truncate it.""" + fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC) + os.close(fd) + +def sortdict(dict): + "Like repr(dict), but in sorted order." + items = sorted(dict.items()) + reprpairs = ["%r: %r" % pair for pair in items] + withcommas = ", ".join(reprpairs) + return "{%s}" % withcommas + +def make_bad_fd(): + """ + Create an invalid file descriptor by opening and closing a file and return + its fd. + """ + file = open(TESTFN, "wb") + try: + return file.fileno() + finally: + file.close() + unlink(TESTFN) + + +def check_syntax_error(testcase, statement, errtext='', *, lineno=None, offset=None): + with testcase.assertRaisesRegex(SyntaxError, errtext) as cm: + compile(statement, '', 'exec') + err = cm.exception + testcase.assertIsNotNone(err.lineno) + if lineno is not None: + testcase.assertEqual(err.lineno, lineno) + testcase.assertIsNotNone(err.offset) + if offset is not None: + testcase.assertEqual(err.offset, offset) + +def check_syntax_warning(testcase, statement, errtext='', *, lineno=1, offset=None): + # Test also that a warning is emitted only once. + with warnings.catch_warnings(record=True) as warns: + warnings.simplefilter('always', SyntaxWarning) + compile(statement, '', 'exec') + testcase.assertEqual(len(warns), 1, warns) + + warn, = warns + testcase.assertTrue(issubclass(warn.category, SyntaxWarning), warn.category) + if errtext: + testcase.assertRegex(str(warn.message), errtext) + testcase.assertEqual(warn.filename, '') + testcase.assertIsNotNone(warn.lineno) + if lineno is not None: + testcase.assertEqual(warn.lineno, lineno) + + # SyntaxWarning should be converted to SyntaxError when raised, + # since the latter contains more information and provides better + # error report. + with warnings.catch_warnings(record=True) as warns: + warnings.simplefilter('error', SyntaxWarning) + check_syntax_error(testcase, statement, errtext, + lineno=lineno, offset=offset) + # No warnings are leaked when a SyntaxError is raised. + testcase.assertEqual(warns, []) + + +def open_urlresource(url, *args, **kw): + import urllib.request, urllib.parse + + check = kw.pop('check', None) + + filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL! + + fn = os.path.join(TEST_DATA_DIR, filename) + + def check_valid_file(fn): + f = open(fn, *args, **kw) + if check is None: + return f + elif check(f): + f.seek(0) + return f + f.close() + + if os.path.exists(fn): + f = check_valid_file(fn) + if f is not None: + return f + unlink(fn) + + # Verify the requirement before downloading the file + requires('urlfetch') + + if verbose: + print('\tfetching %s ...' % url, file=get_original_stdout()) + opener = urllib.request.build_opener() + if gzip: + opener.addheaders.append(('Accept-Encoding', 'gzip')) + f = opener.open(url, timeout=15) + if gzip and f.headers.get('Content-Encoding') == 'gzip': + f = gzip.GzipFile(fileobj=f) + try: + with open(fn, "wb") as out: + s = f.read() + while s: + out.write(s) + s = f.read() + finally: + f.close() + + f = check_valid_file(fn) + if f is not None: + return f + raise TestFailed('invalid resource %r' % fn) + + +class WarningsRecorder(object): + """Convenience wrapper for the warnings list returned on + entry to the warnings.catch_warnings() context manager. + """ + def __init__(self, warnings_list): + self._warnings = warnings_list + self._last = 0 + + def __getattr__(self, attr): + if len(self._warnings) > self._last: + return getattr(self._warnings[-1], attr) + elif attr in warnings.WarningMessage._WARNING_DETAILS: + return None + raise AttributeError("%r has no attribute %r" % (self, attr)) + + @property + def warnings(self): + return self._warnings[self._last:] + + def reset(self): + self._last = len(self._warnings) + + +def _filterwarnings(filters, quiet=False): + """Catch the warnings, then check if all the expected + warnings have been raised and re-raise unexpected warnings. + If 'quiet' is True, only re-raise the unexpected warnings. + """ + # Clear the warning registry of the calling module + # in order to re-raise the warnings. + frame = sys._getframe(2) + registry = frame.f_globals.get('__warningregistry__') + if registry: + registry.clear() + with warnings.catch_warnings(record=True) as w: + # Set filter "always" to record all warnings. Because + # test_warnings swap the module, we need to look up in + # the sys.modules dictionary. + sys.modules['warnings'].simplefilter("always") + yield WarningsRecorder(w) + # Filter the recorded warnings + reraise = list(w) + missing = [] + for msg, cat in filters: + seen = False + for w in reraise[:]: + warning = w.message + # Filter out the matching messages + if (re.match(msg, str(warning), re.I) and + issubclass(warning.__class__, cat)): + seen = True + reraise.remove(w) + if not seen and not quiet: + # This filter caught nothing + missing.append((msg, cat.__name__)) + if reraise: + raise AssertionError("unhandled warning %s" % reraise[0]) + if missing: + raise AssertionError("filter (%r, %s) did not catch any warning" % + missing[0]) + + +@contextlib.contextmanager +def check_warnings(*filters, **kwargs): + """Context manager to silence warnings. + + Accept 2-tuples as positional arguments: + ("message regexp", WarningCategory) + + Optional argument: + - if 'quiet' is True, it does not fail if a filter catches nothing + (default True without argument, + default False if some filters are defined) + + Without argument, it defaults to: + check_warnings(("", Warning), quiet=True) + """ + quiet = kwargs.get('quiet') + if not filters: + filters = (("", Warning),) + # Preserve backward compatibility + if quiet is None: + quiet = True + return _filterwarnings(filters, quiet) + + +@contextlib.contextmanager +def check_no_warnings(testcase, message='', category=Warning, force_gc=False): + """Context manager to check that no warnings are emitted. + + This context manager enables a given warning within its scope + and checks that no warnings are emitted even with that warning + enabled. + + If force_gc is True, a garbage collection is attempted before checking + for warnings. This may help to catch warnings emitted when objects + are deleted, such as ResourceWarning. + + Other keyword arguments are passed to warnings.filterwarnings(). + """ + with warnings.catch_warnings(record=True) as warns: + warnings.filterwarnings('always', + message=message, + category=category) + yield + if force_gc: + gc_collect() + testcase.assertEqual(warns, []) + + +@contextlib.contextmanager +def check_no_resource_warning(testcase): + """Context manager to check that no ResourceWarning is emitted. + + Usage: + + with check_no_resource_warning(self): + f = open(...) + ... + del f + + You must remove the object which may emit ResourceWarning before + the end of the context manager. + """ + with check_no_warnings(testcase, category=ResourceWarning, force_gc=True): + yield + + +class CleanImport(object): + """Context manager to force import to return a new module reference. + + This is useful for testing module-level behaviours, such as + the emission of a DeprecationWarning on import. + + Use like this: + + with CleanImport("foo"): + importlib.import_module("foo") # new reference + """ + + def __init__(self, *module_names): + self.original_modules = sys.modules.copy() + for module_name in module_names: + if module_name in sys.modules: + module = sys.modules[module_name] + # It is possible that module_name is just an alias for + # another module (e.g. stub for modules renamed in 3.x). + # In that case, we also need delete the real module to clear + # the import cache. + if module.__name__ != module_name: + del sys.modules[module.__name__] + del sys.modules[module_name] + + def __enter__(self): + return self + + def __exit__(self, *ignore_exc): + sys.modules.update(self.original_modules) + + +class EnvironmentVarGuard(collections.abc.MutableMapping): + + """Class to help protect the environment variable properly. Can be used as + a context manager.""" + + def __init__(self): + self._environ = os.environ + self._changed = {} + + def __getitem__(self, envvar): + return self._environ[envvar] + + def __setitem__(self, envvar, value): + # Remember the initial value on the first access + if envvar not in self._changed: + self._changed[envvar] = self._environ.get(envvar) + self._environ[envvar] = value + + def __delitem__(self, envvar): + # Remember the initial value on the first access + if envvar not in self._changed: + self._changed[envvar] = self._environ.get(envvar) + if envvar in self._environ: + del self._environ[envvar] + + def keys(self): + return self._environ.keys() + + def __iter__(self): + return iter(self._environ) + + def __len__(self): + return len(self._environ) + + def set(self, envvar, value): + self[envvar] = value + + def unset(self, envvar): + del self[envvar] + + def __enter__(self): + return self + + def __exit__(self, *ignore_exc): + for (k, v) in self._changed.items(): + if v is None: + if k in self._environ: + del self._environ[k] + else: + self._environ[k] = v + os.environ = self._environ + + +class DirsOnSysPath(object): + """Context manager to temporarily add directories to sys.path. + + This makes a copy of sys.path, appends any directories given + as positional arguments, then reverts sys.path to the copied + settings when the context ends. + + Note that *all* sys.path modifications in the body of the + context manager, including replacement of the object, + will be reverted at the end of the block. + """ + + def __init__(self, *paths): + self.original_value = sys.path[:] + self.original_object = sys.path + sys.path.extend(paths) + + def __enter__(self): + return self + + def __exit__(self, *ignore_exc): + sys.path = self.original_object + sys.path[:] = self.original_value + + +class TransientResource(object): + + """Raise ResourceDenied if an exception is raised while the context manager + is in effect that matches the specified exception and attributes.""" + + def __init__(self, exc, **kwargs): + self.exc = exc + self.attrs = kwargs + + def __enter__(self): + return self + + def __exit__(self, type_=None, value=None, traceback=None): + """If type_ is a subclass of self.exc and value has attributes matching + self.attrs, raise ResourceDenied. Otherwise let the exception + propagate (if any).""" + if type_ is not None and issubclass(self.exc, type_): + for attr, attr_value in self.attrs.items(): + if not hasattr(value, attr): + break + if getattr(value, attr) != attr_value: + break + else: + raise ResourceDenied("an optional resource is not available") + +# Context managers that raise ResourceDenied when various issues +# with the Internet connection manifest themselves as exceptions. +# XXX deprecate these and use transient_internet() instead +time_out = TransientResource(OSError, errno=errno.ETIMEDOUT) +socket_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET) +ioerror_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET) + + +def get_socket_conn_refused_errs(): + """ + Get the different socket error numbers ('errno') which can be received + when a connection is refused. + """ + errors = [errno.ECONNREFUSED] + if hasattr(errno, 'ENETUNREACH'): + # On Solaris, ENETUNREACH is returned sometimes instead of ECONNREFUSED + errors.append(errno.ENETUNREACH) + if hasattr(errno, 'EADDRNOTAVAIL'): + # bpo-31910: socket.create_connection() fails randomly + # with EADDRNOTAVAIL on Travis CI + errors.append(errno.EADDRNOTAVAIL) + return errors + + +@contextlib.contextmanager +def transient_internet(resource_name, *, timeout=30.0, errnos=()): + """Return a context manager that raises ResourceDenied when various issues + with the Internet connection manifest themselves as exceptions.""" + default_errnos = [ + ('ECONNREFUSED', 111), + ('ECONNRESET', 104), + ('EHOSTUNREACH', 113), + ('ENETUNREACH', 101), + ('ETIMEDOUT', 110), + # socket.create_connection() fails randomly with + # EADDRNOTAVAIL on Travis CI. + ('EADDRNOTAVAIL', 99), + ] + default_gai_errnos = [ + ('EAI_AGAIN', -3), + ('EAI_FAIL', -4), + ('EAI_NONAME', -2), + ('EAI_NODATA', -5), + # Encountered when trying to resolve IPv6-only hostnames + ('WSANO_DATA', 11004), + ] + + denied = ResourceDenied("Resource %r is not available" % resource_name) + captured_errnos = errnos + gai_errnos = [] + if not captured_errnos: + captured_errnos = [getattr(errno, name, num) + for (name, num) in default_errnos] + gai_errnos = [getattr(socket, name, num) + for (name, num) in default_gai_errnos] + + def filter_error(err): + n = getattr(err, 'errno', None) + if (isinstance(err, socket.timeout) or + (isinstance(err, socket.gaierror) and n in gai_errnos) or + (isinstance(err, urllib.error.HTTPError) and + 500 <= err.code <= 599) or + (isinstance(err, urllib.error.URLError) and + (("ConnectionRefusedError" in err.reason) or + ("TimeoutError" in err.reason) or + ("EOFError" in err.reason))) or + n in captured_errnos): + if not verbose: + sys.stderr.write(denied.args[0] + "\n") + raise denied from err + + old_timeout = socket.getdefaulttimeout() + try: + if timeout is not None: + socket.setdefaulttimeout(timeout) + yield + except nntplib.NNTPTemporaryError as err: + if verbose: + sys.stderr.write(denied.args[0] + "\n") + raise denied from err + except OSError as err: + # urllib can wrap original socket errors multiple times (!), we must + # unwrap to get at the original error. + while True: + a = err.args + if len(a) >= 1 and isinstance(a[0], OSError): + err = a[0] + # The error can also be wrapped as args[1]: + # except socket.error as msg: + # raise OSError('socket error', msg).with_traceback(sys.exc_info()[2]) + elif len(a) >= 2 and isinstance(a[1], OSError): + err = a[1] + else: + break + filter_error(err) + raise + # XXX should we catch generic exceptions and look for their + # __cause__ or __context__? + finally: + socket.setdefaulttimeout(old_timeout) + + +@contextlib.contextmanager +def captured_output(stream_name): + """Return a context manager used by captured_stdout/stdin/stderr + that temporarily replaces the sys stream *stream_name* with a StringIO.""" + import io + orig_stdout = getattr(sys, stream_name) + setattr(sys, stream_name, io.StringIO()) + try: + yield getattr(sys, stream_name) + finally: + setattr(sys, stream_name, orig_stdout) + +def captured_stdout(): + """Capture the output of sys.stdout: + + with captured_stdout() as stdout: + print("hello") + self.assertEqual(stdout.getvalue(), "hello\\n") + """ + return captured_output("stdout") + +def captured_stderr(): + """Capture the output of sys.stderr: + + with captured_stderr() as stderr: + print("hello", file=sys.stderr) + self.assertEqual(stderr.getvalue(), "hello\\n") + """ + return captured_output("stderr") + +def captured_stdin(): + """Capture the input to sys.stdin: + + with captured_stdin() as stdin: + stdin.write('hello\\n') + stdin.seek(0) + # call test code that consumes from sys.stdin + captured = input() + self.assertEqual(captured, "hello") + """ + return captured_output("stdin") + + +def gc_collect(): + """Force as many objects as possible to be collected. + + In non-CPython implementations of Python, this is needed because timely + deallocation is not guaranteed by the garbage collector. (Even in CPython + this can be the case in case of reference cycles.) This means that __del__ + methods may be called later than expected and weakrefs may remain alive for + longer than expected. This function tries its best to force all garbage + objects to disappear. + """ + gc.collect() + if is_jython: + time.sleep(0.1) + gc.collect() + gc.collect() + +@contextlib.contextmanager +def disable_gc(): + have_gc = gc.isenabled() + gc.disable() + try: + yield + finally: + if have_gc: + gc.enable() + + +def python_is_optimized(): + """Find if Python was built with optimizations.""" + cflags = sysconfig.get_config_var('PY_CFLAGS') or '' + final_opt = "" + for opt in cflags.split(): + if opt.startswith('-O'): + final_opt = opt + return final_opt not in ('', '-O0', '-Og') + + +_header = 'nP' +_align = '0n' +if hasattr(sys, "getobjects"): + _header = '2P' + _header + _align = '0P' +_vheader = _header + 'n' + +def calcobjsize(fmt): + return struct.calcsize(_header + fmt + _align) + +def calcvobjsize(fmt): + return struct.calcsize(_vheader + fmt + _align) + + +_TPFLAGS_HAVE_GC = 1<<14 +_TPFLAGS_HEAPTYPE = 1<<9 + +def check_sizeof(test, o, size): + import _testcapi + result = sys.getsizeof(o) + # add GC header size + if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\ + ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))): + size += _testcapi.SIZEOF_PYGC_HEAD + msg = 'wrong size for %s: got %d, expected %d' \ + % (type(o), result, size) + test.assertEqual(result, size, msg) + +#======================================================================= +# Decorator for running a function in a different locale, correctly resetting +# it afterwards. + +def run_with_locale(catstr, *locales): + def decorator(func): + def inner(*args, **kwds): + try: + import locale + category = getattr(locale, catstr) + orig_locale = locale.setlocale(category) + except AttributeError: + # if the test author gives us an invalid category string + raise + except: + # cannot retrieve original locale, so do nothing + locale = orig_locale = None + else: + for loc in locales: + try: + locale.setlocale(category, loc) + break + except: + pass + + # now run the function, resetting the locale on exceptions + try: + return func(*args, **kwds) + finally: + if locale and orig_locale: + locale.setlocale(category, orig_locale) + inner.__name__ = func.__name__ + inner.__doc__ = func.__doc__ + return inner + return decorator + +#======================================================================= +# Decorator for running a function in a specific timezone, correctly +# resetting it afterwards. + +def run_with_tz(tz): + def decorator(func): + def inner(*args, **kwds): + try: + tzset = time.tzset + except AttributeError: + raise unittest.SkipTest("tzset required") + if 'TZ' in os.environ: + orig_tz = os.environ['TZ'] + else: + orig_tz = None + os.environ['TZ'] = tz + tzset() + + # now run the function, resetting the tz on exceptions + try: + return func(*args, **kwds) + finally: + if orig_tz is None: + del os.environ['TZ'] + else: + os.environ['TZ'] = orig_tz + time.tzset() + + inner.__name__ = func.__name__ + inner.__doc__ = func.__doc__ + return inner + return decorator + +#======================================================================= +# Big-memory-test support. Separate from 'resources' because memory use +# should be configurable. + +# Some handy shorthands. Note that these are used for byte-limits as well +# as size-limits, in the various bigmem tests +_1M = 1024*1024 +_1G = 1024 * _1M +_2G = 2 * _1G +_4G = 4 * _1G + +MAX_Py_ssize_t = sys.maxsize + +def set_memlimit(limit): + global max_memuse + global real_max_memuse + sizes = { + 'k': 1024, + 'm': _1M, + 'g': _1G, + 't': 1024*_1G, + } + m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit, + re.IGNORECASE | re.VERBOSE) + if m is None: + raise ValueError('Invalid memory limit %r' % (limit,)) + memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()]) + real_max_memuse = memlimit + if memlimit > MAX_Py_ssize_t: + memlimit = MAX_Py_ssize_t + if memlimit < _2G - 1: + raise ValueError('Memory limit %r too low to be useful' % (limit,)) + max_memuse = memlimit + +class _MemoryWatchdog: + """An object which periodically watches the process' memory consumption + and prints it out. + """ + + def __init__(self): + self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid()) + self.started = False + + def start(self): + try: + f = open(self.procfile, 'r') + except OSError as e: + warnings.warn('/proc not available for stats: {}'.format(e), + RuntimeWarning) + sys.stderr.flush() + return + + with f: + watchdog_script = findfile("memory_watchdog.py") + self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script], + stdin=f, + stderr=subprocess.DEVNULL) + self.started = True + + def stop(self): + if self.started: + self.mem_watchdog.terminate() + self.mem_watchdog.wait() + + +def bigmemtest(size, memuse, dry_run=True): + """Decorator for bigmem tests. + + 'size' is a requested size for the test (in arbitrary, test-interpreted + units.) 'memuse' is the number of bytes per unit for the test, or a good + estimate of it. For example, a test that needs two byte buffers, of 4 GiB + each, could be decorated with @bigmemtest(size=_4G, memuse=2). + + The 'size' argument is normally passed to the decorated test method as an + extra argument. If 'dry_run' is true, the value passed to the test method + may be less than the requested value. If 'dry_run' is false, it means the + test doesn't support dummy runs when -M is not specified. + """ + def decorator(f): + def wrapper(self): + size = wrapper.size + memuse = wrapper.memuse + if not real_max_memuse: + maxsize = 5147 + else: + maxsize = size + + if ((real_max_memuse or not dry_run) + and real_max_memuse < maxsize * memuse): + raise unittest.SkipTest( + "not enough memory: %.1fG minimum needed" + % (size * memuse / (1024 ** 3))) + + if real_max_memuse and verbose: + print() + print(" ... expected peak memory use: {peak:.1f}G" + .format(peak=size * memuse / (1024 ** 3))) + watchdog = _MemoryWatchdog() + watchdog.start() + else: + watchdog = None + + try: + return f(self, maxsize) + finally: + if watchdog: + watchdog.stop() + + wrapper.size = size + wrapper.memuse = memuse + return wrapper + return decorator + +def bigaddrspacetest(f): + """Decorator for tests that fill the address space.""" + def wrapper(self): + if max_memuse < MAX_Py_ssize_t: + if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31: + raise unittest.SkipTest( + "not enough memory: try a 32-bit build instead") + else: + raise unittest.SkipTest( + "not enough memory: %.1fG minimum needed" + % (MAX_Py_ssize_t / (1024 ** 3))) + else: + return f(self) + return wrapper + +#======================================================================= +# unittest integration. + +class BasicTestRunner: + def run(self, test): + result = unittest.TestResult() + test(result) + return result + +def _id(obj): + return obj + +def requires_resource(resource): + if resource == 'gui' and not _is_gui_available(): + return unittest.skip(_is_gui_available.reason) + if is_resource_enabled(resource): + return _id + else: + return unittest.skip("resource {0!r} is not enabled".format(resource)) + +def cpython_only(test): + """ + Decorator for tests only applicable on CPython. + """ + return impl_detail(cpython=True)(test) + +def impl_detail(msg=None, **guards): + if check_impl_detail(**guards): + return _id + if msg is None: + guardnames, default = _parse_guards(guards) + if default: + msg = "implementation detail not available on {0}" + else: + msg = "implementation detail specific to {0}" + guardnames = sorted(guardnames.keys()) + msg = msg.format(' or '.join(guardnames)) + return unittest.skip(msg) + +def _parse_guards(guards): + # Returns a tuple ({platform_name: run_me}, default_value) + if not guards: + return ({'cpython': True}, False) + is_true = list(guards.values())[0] + assert list(guards.values()) == [is_true] * len(guards) # all True or all False + return (guards, not is_true) + +# Use the following check to guard CPython's implementation-specific tests -- +# or to run them only on the implementation(s) guarded by the arguments. +def check_impl_detail(**guards): + """This function returns True or False depending on the host platform. + Examples: + if check_impl_detail(): # only on CPython (default) + if check_impl_detail(jython=True): # only on Jython + if check_impl_detail(cpython=False): # everywhere except on CPython + """ + guards, default = _parse_guards(guards) + return guards.get(platform.python_implementation().lower(), default) + + +def no_tracing(func): + """Decorator to temporarily turn off tracing for the duration of a test.""" + if not hasattr(sys, 'gettrace'): + return func + else: + @functools.wraps(func) + def wrapper(*args, **kwargs): + original_trace = sys.gettrace() + try: + sys.settrace(None) + return func(*args, **kwargs) + finally: + sys.settrace(original_trace) + return wrapper + + +def refcount_test(test): + """Decorator for tests which involve reference counting. + + To start, the decorator does not run the test if is not run by CPython. + After that, any trace function is unset during the test to prevent + unexpected refcounts caused by the trace function. + + """ + return no_tracing(cpython_only(test)) + + +def _filter_suite(suite, pred): + """Recursively filter test cases in a suite based on a predicate.""" + newtests = [] + for test in suite._tests: + if isinstance(test, unittest.TestSuite): + _filter_suite(test, pred) + newtests.append(test) + else: + if pred(test): + newtests.append(test) + suite._tests = newtests + +def _run_suite(suite): + """Run tests from a unittest.TestSuite-derived class.""" + runner = get_test_runner(sys.stdout, + verbosity=verbose, + capture_output=(junit_xml_list is not None)) + + result = runner.run(suite) + + if junit_xml_list is not None: + junit_xml_list.append(result.get_xml_element()) + + if not result.testsRun and not result.skipped: + raise TestDidNotRun + if not result.wasSuccessful(): + if len(result.errors) == 1 and not result.failures: + err = result.errors[0][1] + elif len(result.failures) == 1 and not result.errors: + err = result.failures[0][1] + else: + err = "multiple errors occurred" + if not verbose: err += "; run in verbose mode for details" + raise TestFailed(err) + + +# By default, don't filter tests +_match_test_func = None +_match_test_patterns = None + + +def match_test(test): + # Function used by support.run_unittest() and regrtest --list-cases + if _match_test_func is None: + return True + else: + return _match_test_func(test.id()) + + +def _is_full_match_test(pattern): + # If a pattern contains at least one dot, it's considered + # as a full test identifier. + # Example: 'test.test_os.FileTests.test_access'. + # + # Reject patterns which contain fnmatch patterns: '*', '?', '[...]' + # or '[!...]'. For example, reject 'test_access*'. + return ('.' in pattern) and (not re.search(r'[?*\[\]]', pattern)) + + +def set_match_tests(patterns): + global _match_test_func, _match_test_patterns + + if patterns == _match_test_patterns: + # No change: no need to recompile patterns. + return + + if not patterns: + func = None + # set_match_tests(None) behaves as set_match_tests(()) + patterns = () + elif all(map(_is_full_match_test, patterns)): + # Simple case: all patterns are full test identifier. + # The test.bisect_cmd utility only uses such full test identifiers. + func = set(patterns).__contains__ + else: + regex = '|'.join(map(fnmatch.translate, patterns)) + # The search *is* case sensitive on purpose: + # don't use flags=re.IGNORECASE + regex_match = re.compile(regex).match + + def match_test_regex(test_id): + if regex_match(test_id): + # The regex matches the whole identifier, for example + # 'test.test_os.FileTests.test_access'. + return True + else: + # Try to match parts of the test identifier. + # For example, split 'test.test_os.FileTests.test_access' + # into: 'test', 'test_os', 'FileTests' and 'test_access'. + return any(map(regex_match, test_id.split("."))) + + func = match_test_regex + + # Create a copy since patterns can be mutable and so modified later + _match_test_patterns = tuple(patterns) + _match_test_func = func + + + +def run_unittest(*classes): + """Run tests from unittest.TestCase-derived classes.""" + valid_types = (unittest.TestSuite, unittest.TestCase) + suite = unittest.TestSuite() + for cls in classes: + if isinstance(cls, str): + if cls in sys.modules: + suite.addTest(unittest.findTestCases(sys.modules[cls])) + else: + raise ValueError("str arguments must be keys in sys.modules") + elif isinstance(cls, valid_types): + suite.addTest(cls) + else: + suite.addTest(unittest.makeSuite(cls)) + _filter_suite(suite, match_test) + _run_suite(suite) + +#======================================================================= +# Check for the presence of docstrings. + +# Rather than trying to enumerate all the cases where docstrings may be +# disabled, we just check for that directly + +def _check_docstrings(): + """Just used to check if docstrings are enabled""" + +MISSING_C_DOCSTRINGS = (check_impl_detail() and + sys.platform != 'win32' and + not sysconfig.get_config_var('WITH_DOC_STRINGS')) + +HAVE_DOCSTRINGS = (_check_docstrings.__doc__ is not None and + not MISSING_C_DOCSTRINGS) + +requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS, + "test requires docstrings") + + +#======================================================================= +# doctest driver. + +def run_doctest(module, verbosity=None, optionflags=0): + """Run doctest on the given module. Return (#failures, #tests). + + If optional argument verbosity is not specified (or is None), pass + support's belief about verbosity on to doctest. Else doctest's + usual behavior is used (it searches sys.argv for -v). + """ + + import doctest + + if verbosity is None: + verbosity = verbose + else: + verbosity = None + + f, t = doctest.testmod(module, verbose=verbosity, optionflags=optionflags) + if f: + raise TestFailed("%d of %d doctests failed" % (f, t)) + if verbose: + print('doctest (%s) ... %d tests with zero failures' % + (module.__name__, t)) + return f, t + + +#======================================================================= +# Support for saving and restoring the imported modules. + +def modules_setup(): + return sys.modules.copy(), + +def modules_cleanup(oldmodules): + # Encoders/decoders are registered permanently within the internal + # codec cache. If we destroy the corresponding modules their + # globals will be set to None which will trip up the cached functions. + encodings = [(k, v) for k, v in sys.modules.items() + if k.startswith('encodings.')] + sys.modules.clear() + sys.modules.update(encodings) + # XXX: This kind of problem can affect more than just encodings. In particular + # extension modules (such as _ssl) don't cope with reloading properly. + # Really, test modules should be cleaning out the test specific modules they + # know they added (ala test_runpy) rather than relying on this function (as + # test_importhooks and test_pkg do currently). + # Implicitly imported *real* modules should be left alone (see issue 10556). + sys.modules.update(oldmodules) + +#======================================================================= +# Threading support to prevent reporting refleaks when running regrtest.py -R + +# Flag used by saved_test_environment of test.libregrtest.save_env, +# to check if a test modified the environment. The flag should be set to False +# before running a new test. +# +# For example, threading_cleanup() sets the flag is the function fails +# to cleanup threads. +environment_altered = False + +# NOTE: we use thread._count() rather than threading.enumerate() (or the +# moral equivalent thereof) because a threading.Thread object is still alive +# until its __bootstrap() method has returned, even after it has been +# unregistered from the threading module. +# thread._count(), on the other hand, only gets decremented *after* the +# __bootstrap() method has returned, which gives us reliable reference counts +# at the end of a test run. + +def threading_setup(): + return _thread._count(), threading._dangling.copy() + +def threading_cleanup(*original_values): + global environment_altered + + _MAX_COUNT = 100 + + for count in range(_MAX_COUNT): + values = _thread._count(), threading._dangling + if values == original_values: + break + + if not count: + # Display a warning at the first iteration + environment_altered = True + dangling_threads = values[1] + print("Warning -- threading_cleanup() failed to cleanup " + "%s threads (count: %s, dangling: %s)" + % (values[0] - original_values[0], + values[0], len(dangling_threads)), + file=sys.stderr) + for thread in dangling_threads: + print(f"Dangling thread: {thread!r}", file=sys.stderr) + sys.stderr.flush() + + # Don't hold references to threads + dangling_threads = None + values = None + + time.sleep(0.01) + gc_collect() + + +def reap_threads(func): + """Use this function when threads are being used. This will + ensure that the threads are cleaned up even when the test fails. + """ + @functools.wraps(func) + def decorator(*args): + key = threading_setup() + try: + return func(*args) + finally: + threading_cleanup(*key) + return decorator + + +@contextlib.contextmanager +def wait_threads_exit(timeout=60.0): + """ + bpo-31234: Context manager to wait until all threads created in the with + statement exit. + + Use _thread.count() to check if threads exited. Indirectly, wait until + threads exit the internal t_bootstrap() C function of the _thread module. + + threading_setup() and threading_cleanup() are designed to emit a warning + if a test leaves running threads in the background. This context manager + is designed to cleanup threads started by the _thread.start_new_thread() + which doesn't allow to wait for thread exit, whereas thread.Thread has a + join() method. + """ + old_count = _thread._count() + try: + yield + finally: + start_time = time.monotonic() + deadline = start_time + timeout + while True: + count = _thread._count() + if count <= old_count: + break + if time.monotonic() > deadline: + dt = time.monotonic() - start_time + msg = (f"wait_threads() failed to cleanup {count - old_count} " + f"threads after {dt:.1f} seconds " + f"(count: {count}, old count: {old_count})") + raise AssertionError(msg) + time.sleep(0.010) + gc_collect() + + +def join_thread(thread, timeout=30.0): + """Join a thread. Raise an AssertionError if the thread is still alive + after timeout seconds. + """ + thread.join(timeout) + if thread.is_alive(): + msg = f"failed to join the thread in {timeout:.1f} seconds" + raise AssertionError(msg) + + +def reap_children(): + """Use this function at the end of test_main() whenever sub-processes + are started. This will help ensure that no extra children (zombies) + stick around to hog resources and create problems when looking + for refleaks. + """ + global environment_altered + + # Need os.waitpid(-1, os.WNOHANG): Windows is not supported + if not (hasattr(os, 'waitpid') and hasattr(os, 'WNOHANG')): + return + + # Reap all our dead child processes so we don't leave zombies around. + # These hog resources and might be causing some of the buildbots to die. + while True: + try: + # Read the exit status of any child process which already completed + pid, status = os.waitpid(-1, os.WNOHANG) + except OSError: + break + + if pid == 0: + break + + print("Warning -- reap_children() reaped child process %s" + % pid, file=sys.stderr) + environment_altered = True + + +@contextlib.contextmanager +def start_threads(threads, unlock=None): + threads = list(threads) + started = [] + try: + try: + for t in threads: + t.start() + started.append(t) + except: + if verbose: + print("Can't start %d threads, only %d threads started" % + (len(threads), len(started))) + raise + yield + finally: + try: + if unlock: + unlock() + endtime = starttime = time.monotonic() + for timeout in range(1, 16): + endtime += 60 + for t in started: + t.join(max(endtime - time.monotonic(), 0.01)) + started = [t for t in started if t.is_alive()] + if not started: + break + if verbose: + print('Unable to join %d threads during a period of ' + '%d minutes' % (len(started), timeout)) + finally: + started = [t for t in started if t.is_alive()] + if started: + faulthandler.dump_traceback(sys.stdout) + raise AssertionError('Unable to join %d threads' % len(started)) + +@contextlib.contextmanager +def swap_attr(obj, attr, new_val): + """Temporary swap out an attribute with a new object. + + Usage: + with swap_attr(obj, "attr", 5): + ... + + This will set obj.attr to 5 for the duration of the with: block, + restoring the old value at the end of the block. If `attr` doesn't + exist on `obj`, it will be created and then deleted at the end of the + block. + + The old value (or None if it doesn't exist) will be assigned to the + target of the "as" clause, if there is one. + """ + if hasattr(obj, attr): + real_val = getattr(obj, attr) + setattr(obj, attr, new_val) + try: + yield real_val + finally: + setattr(obj, attr, real_val) + else: + setattr(obj, attr, new_val) + try: + yield + finally: + if hasattr(obj, attr): + delattr(obj, attr) + +@contextlib.contextmanager +def swap_item(obj, item, new_val): + """Temporary swap out an item with a new object. + + Usage: + with swap_item(obj, "item", 5): + ... + + This will set obj["item"] to 5 for the duration of the with: block, + restoring the old value at the end of the block. If `item` doesn't + exist on `obj`, it will be created and then deleted at the end of the + block. + + The old value (or None if it doesn't exist) will be assigned to the + target of the "as" clause, if there is one. + """ + if item in obj: + real_val = obj[item] + obj[item] = new_val + try: + yield real_val + finally: + obj[item] = real_val + else: + obj[item] = new_val + try: + yield + finally: + if item in obj: + del obj[item] + +def strip_python_stderr(stderr): + """Strip the stderr of a Python process from potential debug output + emitted by the interpreter. + + This will typically be run on the result of the communicate() method + of a subprocess.Popen object. + """ + stderr = re.sub(br"\[\d+ refs, \d+ blocks\]\r?\n?", b"", stderr).strip() + return stderr + +requires_type_collecting = unittest.skipIf(hasattr(sys, 'getcounts'), + 'types are immortal if COUNT_ALLOCS is defined') + +def args_from_interpreter_flags(): + """Return a list of command-line arguments reproducing the current + settings in sys.flags and sys.warnoptions.""" + return subprocess._args_from_interpreter_flags() + +def optim_args_from_interpreter_flags(): + """Return a list of command-line arguments reproducing the current + optimization settings in sys.flags.""" + return subprocess._optim_args_from_interpreter_flags() + +#============================================================ +# Support for assertions about logging. +#============================================================ + +class TestHandler(logging.handlers.BufferingHandler): + def __init__(self, matcher): + # BufferingHandler takes a "capacity" argument + # so as to know when to flush. As we're overriding + # shouldFlush anyway, we can set a capacity of zero. + # You can call flush() manually to clear out the + # buffer. + logging.handlers.BufferingHandler.__init__(self, 0) + self.matcher = matcher + + def shouldFlush(self): + return False + + def emit(self, record): + self.format(record) + self.buffer.append(record.__dict__) + + def matches(self, **kwargs): + """ + Look for a saved dict whose keys/values match the supplied arguments. + """ + result = False + for d in self.buffer: + if self.matcher.matches(d, **kwargs): + result = True + break + return result + +class Matcher(object): + + _partial_matches = ('msg', 'message') + + def matches(self, d, **kwargs): + """ + Try to match a single dict with the supplied arguments. + + Keys whose values are strings and which are in self._partial_matches + will be checked for partial (i.e. substring) matches. You can extend + this scheme to (for example) do regular expression matching, etc. + """ + result = True + for k in kwargs: + v = kwargs[k] + dv = d.get(k) + if not self.match_value(k, dv, v): + result = False + break + return result + + def match_value(self, k, dv, v): + """ + Try to match a single stored value (dv) with a supplied value (v). + """ + if type(v) != type(dv): + result = False + elif type(dv) is not str or k not in self._partial_matches: + result = (v == dv) + else: + result = dv.find(v) >= 0 + return result + + +_can_symlink = None +def can_symlink(): + global _can_symlink + if _can_symlink is not None: + return _can_symlink + symlink_path = TESTFN + "can_symlink" + try: + os.symlink(TESTFN, symlink_path) + can = True + except (OSError, NotImplementedError, AttributeError): + can = False + else: + os.remove(symlink_path) + _can_symlink = can + return can + +def skip_unless_symlink(test): + """Skip decorator for tests that require functional symlink""" + ok = can_symlink() + msg = "Requires functional symlink implementation" + return test if ok else unittest.skip(msg)(test) + +_can_xattr = None +def can_xattr(): + global _can_xattr + if _can_xattr is not None: + return _can_xattr + if not hasattr(os, "setxattr"): + can = False + else: + tmp_dir = tempfile.mkdtemp() + tmp_fp, tmp_name = tempfile.mkstemp(dir=tmp_dir) + try: + with open(TESTFN, "wb") as fp: + try: + # TESTFN & tempfile may use different file systems with + # different capabilities + os.setxattr(tmp_fp, b"user.test", b"") + os.setxattr(tmp_name, b"trusted.foo", b"42") + os.setxattr(fp.fileno(), b"user.test", b"") + # Kernels < 2.6.39 don't respect setxattr flags. + kernel_version = platform.release() + m = re.match(r"2.6.(\d{1,2})", kernel_version) + can = m is None or int(m.group(1)) >= 39 + except OSError: + can = False + finally: + unlink(TESTFN) + unlink(tmp_name) + rmdir(tmp_dir) + _can_xattr = can + return can + +def skip_unless_xattr(test): + """Skip decorator for tests that require functional extended attributes""" + ok = can_xattr() + msg = "no non-broken extended attribute support" + return test if ok else unittest.skip(msg)(test) + +_bind_nix_socket_error = None +def skip_unless_bind_unix_socket(test): + """Decorator for tests requiring a functional bind() for unix sockets.""" + if not hasattr(socket, 'AF_UNIX'): + return unittest.skip('No UNIX Sockets')(test) + global _bind_nix_socket_error + if _bind_nix_socket_error is None: + path = TESTFN + "can_bind_unix_socket" + with socket.socket(socket.AF_UNIX) as sock: + try: + sock.bind(path) + _bind_nix_socket_error = False + except OSError as e: + _bind_nix_socket_error = e + finally: + unlink(path) + if _bind_nix_socket_error: + msg = 'Requires a functional unix bind(): %s' % _bind_nix_socket_error + return unittest.skip(msg)(test) + else: + return test + + +def fs_is_case_insensitive(directory): + """Detects if the file system for the specified directory is case-insensitive.""" + with tempfile.NamedTemporaryFile(dir=directory) as base: + base_path = base.name + case_path = base_path.upper() + if case_path == base_path: + case_path = base_path.lower() + try: + return os.path.samefile(base_path, case_path) + except FileNotFoundError: + return False + + +def detect_api_mismatch(ref_api, other_api, *, ignore=()): + """Returns the set of items in ref_api not in other_api, except for a + defined list of items to be ignored in this check. + + By default this skips private attributes beginning with '_' but + includes all magic methods, i.e. those starting and ending in '__'. + """ + missing_items = set(dir(ref_api)) - set(dir(other_api)) + if ignore: + missing_items -= set(ignore) + missing_items = set(m for m in missing_items + if not m.startswith('_') or m.endswith('__')) + return missing_items + + +def check__all__(test_case, module, name_of_module=None, extra=(), + blacklist=()): + """Assert that the __all__ variable of 'module' contains all public names. + + The module's public names (its API) are detected automatically based on + whether they match the public name convention and were defined in + 'module'. + + The 'name_of_module' argument can specify (as a string or tuple thereof) + what module(s) an API could be defined in in order to be detected as a + public API. One case for this is when 'module' imports part of its public + API from other modules, possibly a C backend (like 'csv' and its '_csv'). + + The 'extra' argument can be a set of names that wouldn't otherwise be + automatically detected as "public", like objects without a proper + '__module__' attribute. If provided, it will be added to the + automatically detected ones. + + The 'blacklist' argument can be a set of names that must not be treated + as part of the public API even though their names indicate otherwise. + + Usage: + import bar + import foo + import unittest + from test import support + + class MiscTestCase(unittest.TestCase): + def test__all__(self): + support.check__all__(self, foo) + + class OtherTestCase(unittest.TestCase): + def test__all__(self): + extra = {'BAR_CONST', 'FOO_CONST'} + blacklist = {'baz'} # Undocumented name. + # bar imports part of its API from _bar. + support.check__all__(self, bar, ('bar', '_bar'), + extra=extra, blacklist=blacklist) + + """ + + if name_of_module is None: + name_of_module = (module.__name__, ) + elif isinstance(name_of_module, str): + name_of_module = (name_of_module, ) + + expected = set(extra) + + for name in dir(module): + if name.startswith('_') or name in blacklist: + continue + obj = getattr(module, name) + if (getattr(obj, '__module__', None) in name_of_module or + (not hasattr(obj, '__module__') and + not isinstance(obj, types.ModuleType))): + expected.add(name) + test_case.assertCountEqual(module.__all__, expected) + + +class SuppressCrashReport: + """Try to prevent a crash report from popping up. + + On Windows, don't display the Windows Error Reporting dialog. On UNIX, + disable the creation of coredump file. + """ + old_value = None + old_modes = None + + def __enter__(self): + """On Windows, disable Windows Error Reporting dialogs using + SetErrorMode. + + On UNIX, try to save the previous core file size limit, then set + soft limit to 0. + """ + if sys.platform.startswith('win'): + # see http://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx + # GetErrorMode is not available on Windows XP and Windows Server 2003, + # but SetErrorMode returns the previous value, so we can use that + import ctypes + self._k32 = ctypes.windll.kernel32 + SEM_NOGPFAULTERRORBOX = 0x02 + self.old_value = self._k32.SetErrorMode(SEM_NOGPFAULTERRORBOX) + self._k32.SetErrorMode(self.old_value | SEM_NOGPFAULTERRORBOX) + + # Suppress assert dialogs in debug builds + # (see http://bugs.python.org/issue23314) + try: + import msvcrt + msvcrt.CrtSetReportMode + except (AttributeError, ImportError): + # no msvcrt or a release build + pass + else: + self.old_modes = {} + for report_type in [msvcrt.CRT_WARN, + msvcrt.CRT_ERROR, + msvcrt.CRT_ASSERT]: + old_mode = msvcrt.CrtSetReportMode(report_type, + msvcrt.CRTDBG_MODE_FILE) + old_file = msvcrt.CrtSetReportFile(report_type, + msvcrt.CRTDBG_FILE_STDERR) + self.old_modes[report_type] = old_mode, old_file + + else: + if resource is not None: + try: + self.old_value = resource.getrlimit(resource.RLIMIT_CORE) + resource.setrlimit(resource.RLIMIT_CORE, + (0, self.old_value[1])) + except (ValueError, OSError): + pass + + if sys.platform == 'darwin': + # Check if the 'Crash Reporter' on OSX was configured + # in 'Developer' mode and warn that it will get triggered + # when it is. + # + # This assumes that this context manager is used in tests + # that might trigger the next manager. + cmd = ['/usr/bin/defaults', 'read', + 'com.apple.CrashReporter', 'DialogType'] + proc = subprocess.Popen(cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + with proc: + stdout = proc.communicate()[0] + if stdout.strip() == b'developer': + print("this test triggers the Crash Reporter, " + "that is intentional", end='', flush=True) + + return self + + def __exit__(self, *ignore_exc): + """Restore Windows ErrorMode or core file behavior to initial value.""" + if self.old_value is None: + return + + if sys.platform.startswith('win'): + self._k32.SetErrorMode(self.old_value) + + if self.old_modes: + import msvcrt + for report_type, (old_mode, old_file) in self.old_modes.items(): + msvcrt.CrtSetReportMode(report_type, old_mode) + msvcrt.CrtSetReportFile(report_type, old_file) + else: + if resource is not None: + try: + resource.setrlimit(resource.RLIMIT_CORE, self.old_value) + except (ValueError, OSError): + pass + + +def patch(test_instance, object_to_patch, attr_name, new_value): + """Override 'object_to_patch'.'attr_name' with 'new_value'. + + Also, add a cleanup procedure to 'test_instance' to restore + 'object_to_patch' value for 'attr_name'. + The 'attr_name' should be a valid attribute for 'object_to_patch'. + + """ + # check that 'attr_name' is a real attribute for 'object_to_patch' + # will raise AttributeError if it does not exist + getattr(object_to_patch, attr_name) + + # keep a copy of the old value + attr_is_local = False + try: + old_value = object_to_patch.__dict__[attr_name] + except (AttributeError, KeyError): + old_value = getattr(object_to_patch, attr_name, None) + else: + attr_is_local = True + + # restore the value when the test is done + def cleanup(): + if attr_is_local: + setattr(object_to_patch, attr_name, old_value) + else: + delattr(object_to_patch, attr_name) + + test_instance.addCleanup(cleanup) + + # actually override the attribute + setattr(object_to_patch, attr_name, new_value) + + +def run_in_subinterp(code): + """ + Run code in a subinterpreter. Raise unittest.SkipTest if the tracemalloc + module is enabled. + """ + # Issue #10915, #15751: PyGILState_*() functions don't work with + # sub-interpreters, the tracemalloc module uses these functions internally + try: + import tracemalloc + except ImportError: + pass + else: + if tracemalloc.is_tracing(): + raise unittest.SkipTest("run_in_subinterp() cannot be used " + "if tracemalloc module is tracing " + "memory allocations") + import _testcapi + return _testcapi.run_in_subinterp(code) + + +def check_free_after_iterating(test, iter, cls, args=()): + class A(cls): + def __del__(self): + nonlocal done + done = True + try: + next(it) + except StopIteration: + pass + + done = False + it = iter(A(*args)) + # Issue 26494: Shouldn't crash + test.assertRaises(StopIteration, next, it) + # The sequence should be deallocated just after the end of iterating + gc_collect() + test.assertTrue(done) + + +def missing_compiler_executable(cmd_names=[]): + """Check if the compiler components used to build the interpreter exist. + + Check for the existence of the compiler executables whose names are listed + in 'cmd_names' or all the compiler executables when 'cmd_names' is empty + and return the first missing executable or None when none is found + missing. + + """ + from distutils import ccompiler, sysconfig, spawn + compiler = ccompiler.new_compiler() + sysconfig.customize_compiler(compiler) + for name in compiler.executables: + if cmd_names and name not in cmd_names: + continue + cmd = getattr(compiler, name) + if cmd_names: + assert cmd is not None, \ + "the '%s' executable is not configured" % name + elif not cmd: + continue + if spawn.find_executable(cmd[0]) is None: + return cmd[0] + + +_is_android_emulator = None +def setswitchinterval(interval): + # Setting a very low gil interval on the Android emulator causes python + # to hang (issue #26939). + minimum_interval = 1e-5 + if is_android and interval < minimum_interval: + global _is_android_emulator + if _is_android_emulator is None: + _is_android_emulator = (subprocess.check_output( + ['getprop', 'ro.kernel.qemu']).strip() == b'1') + if _is_android_emulator: + interval = minimum_interval + return sys.setswitchinterval(interval) + + +@contextlib.contextmanager +def disable_faulthandler(): + # use sys.__stderr__ instead of sys.stderr, since regrtest replaces + # sys.stderr with a StringIO which has no file descriptor when a test + # is run with -W/--verbose3. + fd = sys.__stderr__.fileno() + + is_enabled = faulthandler.is_enabled() + try: + faulthandler.disable() + yield + finally: + if is_enabled: + faulthandler.enable(file=fd, all_threads=True) + + +def fd_count(): + """Count the number of open file descriptors. + """ + if sys.platform.startswith(('linux', 'freebsd')): + try: + names = os.listdir("/proc/self/fd") + # Substract one because listdir() opens internally a file + # descriptor to list the content of the /proc/self/fd/ directory. + return len(names) - 1 + except FileNotFoundError: + pass + + MAXFD = 256 + if hasattr(os, 'sysconf'): + try: + MAXFD = os.sysconf("SC_OPEN_MAX") + except OSError: + pass + + old_modes = None + if sys.platform == 'win32': + # bpo-25306, bpo-31009: Call CrtSetReportMode() to not kill the process + # on invalid file descriptor if Python is compiled in debug mode + try: + import msvcrt + msvcrt.CrtSetReportMode + except (AttributeError, ImportError): + # no msvcrt or a release build + pass + else: + old_modes = {} + for report_type in (msvcrt.CRT_WARN, + msvcrt.CRT_ERROR, + msvcrt.CRT_ASSERT): + old_modes[report_type] = msvcrt.CrtSetReportMode(report_type, 0) + + try: + count = 0 + for fd in range(MAXFD): + try: + # Prefer dup() over fstat(). fstat() can require input/output + # whereas dup() doesn't. + fd2 = os.dup(fd) + except OSError as e: + if e.errno != errno.EBADF: + raise + else: + os.close(fd2) + count += 1 + finally: + if old_modes is not None: + for report_type in (msvcrt.CRT_WARN, + msvcrt.CRT_ERROR, + msvcrt.CRT_ASSERT): + msvcrt.CrtSetReportMode(report_type, old_modes[report_type]) + + return count + + +class SaveSignals: + """ + Save and restore signal handlers. + + This class is only able to save/restore signal handlers registered + by the Python signal module: see bpo-13285 for "external" signal + handlers. + """ + + def __init__(self): + import signal + self.signal = signal + self.signals = signal.valid_signals() + # SIGKILL and SIGSTOP signals cannot be ignored nor caught + for signame in ('SIGKILL', 'SIGSTOP'): + try: + signum = getattr(signal, signame) + except AttributeError: + continue + self.signals.remove(signum) + self.handlers = {} + + def save(self): + for signum in self.signals: + handler = self.signal.getsignal(signum) + if handler is None: + # getsignal() returns None if a signal handler was not + # registered by the Python signal module, + # and the handler is not SIG_DFL nor SIG_IGN. + # + # Ignore the signal: we cannot restore the handler. + continue + self.handlers[signum] = handler + + def restore(self): + for signum, handler in self.handlers.items(): + self.signal.signal(signum, handler) + + +def with_pymalloc(): + import _testcapi + return _testcapi.WITH_PYMALLOC + + +class FakePath: + """Simple implementing of the path protocol. + """ + def __init__(self, path): + self.path = path + + def __repr__(self): + return f'' + + def __fspath__(self): + if (isinstance(self.path, BaseException) or + isinstance(self.path, type) and + issubclass(self.path, BaseException)): + raise self.path + else: + return self.path + + +def maybe_get_event_loop_policy(): + """Return the global event loop policy if one is set, else return None.""" + return asyncio.events._event_loop_policy + +# Helpers for testing hashing. +NHASHBITS = sys.hash_info.width # number of bits in hash() result +assert NHASHBITS in (32, 64) + +# Return mean and sdev of number of collisions when tossing nballs balls +# uniformly at random into nbins bins. By definition, the number of +# collisions is the number of balls minus the number of occupied bins at +# the end. +def collision_stats(nbins, nballs): + n, k = nbins, nballs + # prob a bin empty after k trials = (1 - 1/n)**k + # mean # empty is then n * (1 - 1/n)**k + # so mean # occupied is n - n * (1 - 1/n)**k + # so collisions = k - (n - n*(1 - 1/n)**k) + # + # For the variance: + # n*(n-1)*(1-2/n)**k + meanempty - meanempty**2 = + # n*(n-1)*(1-2/n)**k + meanempty * (1 - meanempty) + # + # Massive cancellation occurs, and, e.g., for a 64-bit hash code + # 1-1/2**64 rounds uselessly to 1.0. Rather than make heroic (and + # error-prone) efforts to rework the naive formulas to avoid those, + # we use the `decimal` module to get plenty of extra precision. + # + # Note: the exact values are straightforward to compute with + # rationals, but in context that's unbearably slow, requiring + # multi-million bit arithmetic. + import decimal + with decimal.localcontext() as ctx: + bits = n.bit_length() * 2 # bits in n**2 + # At least that many bits will likely cancel out. + # Use that many decimal digits instead. + ctx.prec = max(bits, 30) + dn = decimal.Decimal(n) + p1empty = ((dn - 1) / dn) ** k + meanempty = n * p1empty + occupied = n - meanempty + collisions = k - occupied + var = dn*(dn-1)*((dn-2)/dn)**k + meanempty * (1 - meanempty) + return float(collisions), float(var.sqrt()) diff --git a/Lib/test/support/script_helper.py b/Lib/test/support/script_helper.py new file mode 100644 index 0000000000..27a47f2c4e --- /dev/null +++ b/Lib/test/support/script_helper.py @@ -0,0 +1,263 @@ +# Common utility functions used by various script execution tests +# e.g. test_cmd_line, test_cmd_line_script and test_runpy + +import collections +import importlib +import sys +import os +import os.path +import subprocess +import py_compile +import zipfile + +from importlib.util import source_from_cache +from test.support import make_legacy_pyc, strip_python_stderr + + +# Cached result of the expensive test performed in the function below. +__cached_interp_requires_environment = None + +def interpreter_requires_environment(): + """ + Returns True if our sys.executable interpreter requires environment + variables in order to be able to run at all. + + This is designed to be used with @unittest.skipIf() to annotate tests + that need to use an assert_python*() function to launch an isolated + mode (-I) or no environment mode (-E) sub-interpreter process. + + A normal build & test does not run into this situation but it can happen + when trying to run the standard library test suite from an interpreter that + doesn't have an obvious home with Python's current home finding logic. + + Setting PYTHONHOME is one way to get most of the testsuite to run in that + situation. PYTHONPATH or PYTHONUSERSITE are other common environment + variables that might impact whether or not the interpreter can start. + """ + global __cached_interp_requires_environment + if __cached_interp_requires_environment is None: + # If PYTHONHOME is set, assume that we need it + if 'PYTHONHOME' in os.environ: + __cached_interp_requires_environment = True + return True + + # Try running an interpreter with -E to see if it works or not. + try: + subprocess.check_call([sys.executable, '-E', + '-c', 'import sys; sys.exit(0)']) + except subprocess.CalledProcessError: + __cached_interp_requires_environment = True + else: + __cached_interp_requires_environment = False + + return __cached_interp_requires_environment + + +class _PythonRunResult(collections.namedtuple("_PythonRunResult", + ("rc", "out", "err"))): + """Helper for reporting Python subprocess run results""" + def fail(self, cmd_line): + """Provide helpful details about failed subcommand runs""" + # Limit to 80 lines to ASCII characters + maxlen = 80 * 100 + out, err = self.out, self.err + if len(out) > maxlen: + out = b'(... truncated stdout ...)' + out[-maxlen:] + if len(err) > maxlen: + err = b'(... truncated stderr ...)' + err[-maxlen:] + out = out.decode('ascii', 'replace').rstrip() + err = err.decode('ascii', 'replace').rstrip() + raise AssertionError("Process return code is %d\n" + "command line: %r\n" + "\n" + "stdout:\n" + "---\n" + "%s\n" + "---\n" + "\n" + "stderr:\n" + "---\n" + "%s\n" + "---" + % (self.rc, cmd_line, + out, + err)) + + +# Executing the interpreter in a subprocess +def run_python_until_end(*args, **env_vars): + env_required = interpreter_requires_environment() + cwd = env_vars.pop('__cwd', None) + if '__isolated' in env_vars: + isolated = env_vars.pop('__isolated') + else: + isolated = not env_vars and not env_required + cmd_line = [sys.executable, '-X', 'faulthandler'] + if isolated: + # isolated mode: ignore Python environment variables, ignore user + # site-packages, and don't add the current directory to sys.path + cmd_line.append('-I') + elif not env_vars and not env_required: + # ignore Python environment variables + cmd_line.append('-E') + + # But a special flag that can be set to override -- in this case, the + # caller is responsible to pass the full environment. + if env_vars.pop('__cleanenv', None): + env = {} + if sys.platform == 'win32': + # Windows requires at least the SYSTEMROOT environment variable to + # start Python. + env['SYSTEMROOT'] = os.environ['SYSTEMROOT'] + + # Other interesting environment variables, not copied currently: + # COMSPEC, HOME, PATH, TEMP, TMPDIR, TMP. + else: + # Need to preserve the original environment, for in-place testing of + # shared library builds. + env = os.environ.copy() + + # set TERM='' unless the TERM environment variable is passed explicitly + # see issues #11390 and #18300 + if 'TERM' not in env_vars: + env['TERM'] = '' + + env.update(env_vars) + cmd_line.extend(args) + proc = subprocess.Popen(cmd_line, stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + env=env, cwd=cwd) + with proc: + try: + out, err = proc.communicate() + finally: + proc.kill() + subprocess._cleanup() + rc = proc.returncode + err = strip_python_stderr(err) + return _PythonRunResult(rc, out, err), cmd_line + +def _assert_python(expected_success, *args, **env_vars): + res, cmd_line = run_python_until_end(*args, **env_vars) + if (res.rc and expected_success) or (not res.rc and not expected_success): + res.fail(cmd_line) + return res + +def assert_python_ok(*args, **env_vars): + """ + Assert that running the interpreter with `args` and optional environment + variables `env_vars` succeeds (rc == 0) and return a (return code, stdout, + stderr) tuple. + + If the __cleanenv keyword is set, env_vars is used as a fresh environment. + + Python is started in isolated mode (command line option -I), + except if the __isolated keyword is set to False. + """ + return _assert_python(True, *args, **env_vars) + +def assert_python_failure(*args, **env_vars): + """ + Assert that running the interpreter with `args` and optional environment + variables `env_vars` fails (rc != 0) and return a (return code, stdout, + stderr) tuple. + + See assert_python_ok() for more options. + """ + return _assert_python(False, *args, **env_vars) + +def spawn_python(*args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kw): + """Run a Python subprocess with the given arguments. + + kw is extra keyword args to pass to subprocess.Popen. Returns a Popen + object. + """ + cmd_line = [sys.executable] + if not interpreter_requires_environment(): + cmd_line.append('-E') + cmd_line.extend(args) + # Under Fedora (?), GNU readline can output junk on stderr when initialized, + # depending on the TERM setting. Setting TERM=vt100 is supposed to disable + # that. References: + # - http://reinout.vanrees.org/weblog/2009/08/14/readline-invisible-character-hack.html + # - http://stackoverflow.com/questions/15760712/python-readline-module-prints-escape-character-during-import + # - http://lists.gnu.org/archive/html/bug-readline/2007-08/msg00004.html + env = kw.setdefault('env', dict(os.environ)) + env['TERM'] = 'vt100' + return subprocess.Popen(cmd_line, stdin=subprocess.PIPE, + stdout=stdout, stderr=stderr, + **kw) + +def kill_python(p): + """Run the given Popen process until completion and return stdout.""" + p.stdin.close() + data = p.stdout.read() + p.stdout.close() + # try to cleanup the child so we don't appear to leak when running + # with regrtest -R. + p.wait() + subprocess._cleanup() + return data + +def make_script(script_dir, script_basename, source, omit_suffix=False): + script_filename = script_basename + if not omit_suffix: + script_filename += os.extsep + 'py' + script_name = os.path.join(script_dir, script_filename) + # The script should be encoded to UTF-8, the default string encoding + with open(script_name, 'w', encoding='utf-8') as script_file: + script_file.write(source) + importlib.invalidate_caches() + return script_name + +def make_zip_script(zip_dir, zip_basename, script_name, name_in_zip=None): + zip_filename = zip_basename+os.extsep+'zip' + zip_name = os.path.join(zip_dir, zip_filename) + with zipfile.ZipFile(zip_name, 'w') as zip_file: + if name_in_zip is None: + parts = script_name.split(os.sep) + if len(parts) >= 2 and parts[-2] == '__pycache__': + legacy_pyc = make_legacy_pyc(source_from_cache(script_name)) + name_in_zip = os.path.basename(legacy_pyc) + script_name = legacy_pyc + else: + name_in_zip = os.path.basename(script_name) + zip_file.write(script_name, name_in_zip) + #if test.support.verbose: + # with zipfile.ZipFile(zip_name, 'r') as zip_file: + # print 'Contents of %r:' % zip_name + # zip_file.printdir() + return zip_name, os.path.join(zip_name, name_in_zip) + +def make_pkg(pkg_dir, init_source=''): + os.mkdir(pkg_dir) + make_script(pkg_dir, '__init__', init_source) + +def make_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename, + source, depth=1, compiled=False): + unlink = [] + init_name = make_script(zip_dir, '__init__', '') + unlink.append(init_name) + init_basename = os.path.basename(init_name) + script_name = make_script(zip_dir, script_basename, source) + unlink.append(script_name) + if compiled: + init_name = py_compile.compile(init_name, doraise=True) + script_name = py_compile.compile(script_name, doraise=True) + unlink.extend((init_name, script_name)) + pkg_names = [os.sep.join([pkg_name]*i) for i in range(1, depth+1)] + script_name_in_zip = os.path.join(pkg_names[-1], os.path.basename(script_name)) + zip_filename = zip_basename+os.extsep+'zip' + zip_name = os.path.join(zip_dir, zip_filename) + with zipfile.ZipFile(zip_name, 'w') as zip_file: + for name in pkg_names: + init_name_in_zip = os.path.join(name, init_basename) + zip_file.write(init_name, init_name_in_zip) + zip_file.write(script_name, script_name_in_zip) + for name in unlink: + os.unlink(name) + #if test.support.verbose: + # with zipfile.ZipFile(zip_name, 'r') as zip_file: + # print 'Contents of %r:' % zip_name + # zip_file.printdir() + return zip_name, os.path.join(zip_name, script_name_in_zip) diff --git a/Lib/test/support/testresult.py b/Lib/test/support/testresult.py new file mode 100644 index 0000000000..67e126dcf7 --- /dev/null +++ b/Lib/test/support/testresult.py @@ -0,0 +1,204 @@ +'''Test runner and result class for the regression test suite. + +''' + +import functools +import io +import sys +import time +import traceback +import unittest + +import xml.etree.ElementTree as ET + +from datetime import datetime + +class RegressionTestResult(unittest.TextTestResult): + separator1 = '=' * 70 + '\n' + separator2 = '-' * 70 + '\n' + + def __init__(self, stream, descriptions, verbosity): + super().__init__(stream=stream, descriptions=descriptions, verbosity=0) + self.buffer = True + self.__suite = ET.Element('testsuite') + self.__suite.set('start', datetime.utcnow().isoformat(' ')) + + self.__e = None + self.__start_time = None + self.__results = [] + self.__verbose = bool(verbosity) + + @classmethod + def __getId(cls, test): + try: + test_id = test.id + except AttributeError: + return str(test) + try: + return test_id() + except TypeError: + return str(test_id) + return repr(test) + + def startTest(self, test): + super().startTest(test) + self.__e = e = ET.SubElement(self.__suite, 'testcase') + self.__start_time = time.perf_counter() + if self.__verbose: + self.stream.write(f'{self.getDescription(test)} ... ') + self.stream.flush() + + def _add_result(self, test, capture=False, **args): + e = self.__e + self.__e = None + if e is None: + return + e.set('name', args.pop('name', self.__getId(test))) + e.set('status', args.pop('status', 'run')) + e.set('result', args.pop('result', 'completed')) + if self.__start_time: + e.set('time', f'{time.perf_counter() - self.__start_time:0.6f}') + + if capture: + if self._stdout_buffer is not None: + stdout = self._stdout_buffer.getvalue().rstrip() + ET.SubElement(e, 'system-out').text = stdout + if self._stderr_buffer is not None: + stderr = self._stderr_buffer.getvalue().rstrip() + ET.SubElement(e, 'system-err').text = stderr + + for k, v in args.items(): + if not k or not v: + continue + e2 = ET.SubElement(e, k) + if hasattr(v, 'items'): + for k2, v2 in v.items(): + if k2: + e2.set(k2, str(v2)) + else: + e2.text = str(v2) + else: + e2.text = str(v) + + def __write(self, c, word): + if self.__verbose: + self.stream.write(f'{word}\n') + + @classmethod + def __makeErrorDict(cls, err_type, err_value, err_tb): + if isinstance(err_type, type): + if err_type.__module__ == 'builtins': + typename = err_type.__name__ + else: + typename = f'{err_type.__module__}.{err_type.__name__}' + else: + typename = repr(err_type) + + msg = traceback.format_exception(err_type, err_value, None) + tb = traceback.format_exception(err_type, err_value, err_tb) + + return { + 'type': typename, + 'message': ''.join(msg), + '': ''.join(tb), + } + + def addError(self, test, err): + self._add_result(test, True, error=self.__makeErrorDict(*err)) + super().addError(test, err) + self.__write('E', 'ERROR') + + def addExpectedFailure(self, test, err): + self._add_result(test, True, output=self.__makeErrorDict(*err)) + super().addExpectedFailure(test, err) + self.__write('x', 'expected failure') + + def addFailure(self, test, err): + self._add_result(test, True, failure=self.__makeErrorDict(*err)) + super().addFailure(test, err) + self.__write('F', 'FAIL') + + def addSkip(self, test, reason): + self._add_result(test, skipped=reason) + super().addSkip(test, reason) + self.__write('S', f'skipped {reason!r}') + + def addSuccess(self, test): + self._add_result(test) + super().addSuccess(test) + self.__write('.', 'ok') + + def addUnexpectedSuccess(self, test): + self._add_result(test, outcome='UNEXPECTED_SUCCESS') + super().addUnexpectedSuccess(test) + self.__write('u', 'unexpected success') + + def printErrors(self): + if self.__verbose: + self.stream.write('\n') + self.printErrorList('ERROR', self.errors) + self.printErrorList('FAIL', self.failures) + + def printErrorList(self, flavor, errors): + for test, err in errors: + self.stream.write(self.separator1) + self.stream.write(f'{flavor}: {self.getDescription(test)}\n') + self.stream.write(self.separator2) + self.stream.write('%s\n' % err) + + def get_xml_element(self): + e = self.__suite + e.set('tests', str(self.testsRun)) + e.set('errors', str(len(self.errors))) + e.set('failures', str(len(self.failures))) + return e + +class QuietRegressionTestRunner: + def __init__(self, stream, buffer=False): + self.result = RegressionTestResult(stream, None, 0) + self.result.buffer = buffer + + def run(self, test): + test(self.result) + return self.result + +def get_test_runner_class(verbosity, buffer=False): + if verbosity: + return functools.partial(unittest.TextTestRunner, + resultclass=RegressionTestResult, + buffer=buffer, + verbosity=verbosity) + return functools.partial(QuietRegressionTestRunner, buffer=buffer) + +def get_test_runner(stream, verbosity, capture_output=False): + return get_test_runner_class(verbosity, capture_output)(stream) + +if __name__ == '__main__': + class TestTests(unittest.TestCase): + def test_pass(self): + pass + + def test_pass_slow(self): + time.sleep(1.0) + + def test_fail(self): + print('stdout', file=sys.stdout) + print('stderr', file=sys.stderr) + self.fail('failure message') + + def test_error(self): + print('stdout', file=sys.stdout) + print('stderr', file=sys.stderr) + raise RuntimeError('error message') + + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(TestTests)) + stream = io.StringIO() + runner_cls = get_test_runner_class(sum(a == '-v' for a in sys.argv)) + runner = runner_cls(sys.stdout) + result = runner.run(suite) + print('Output:', stream.getvalue()) + print('XML: ', end='') + for s in ET.tostringlist(result.get_xml_element()): + print(s.decode(), end='') + print() diff --git a/Lib/test/test_bool.py b/Lib/test/test_bool.py new file mode 100644 index 0000000000..909a59a9d2 --- /dev/null +++ b/Lib/test/test_bool.py @@ -0,0 +1,369 @@ +# Test properties of bool promised by PEP 285 + +import unittest +from test import support + +import os + +class BoolTest(unittest.TestCase): + + def test_subclass(self): + try: + class C(bool): + pass + except TypeError: + pass + else: + self.fail("bool should not be subclassable") + + self.assertRaises(TypeError, int.__new__, bool, 0) + + def test_print(self): + try: + with open(support.TESTFN, "w") as fo: + print(False, True, file=fo) + with open(support.TESTFN, "r") as fi: + self.assertEqual(fi.read(), 'False True\n') + finally: + os.remove(support.TESTFN) + + def test_repr(self): + self.assertEqual(repr(False), 'False') + self.assertEqual(repr(True), 'True') + self.assertEqual(eval(repr(False)), False) + self.assertEqual(eval(repr(True)), True) + + def test_str(self): + self.assertEqual(str(False), 'False') + self.assertEqual(str(True), 'True') + + def test_int(self): + self.assertEqual(int(False), 0) + self.assertIsNot(int(False), False) + self.assertEqual(int(True), 1) + self.assertIsNot(int(True), True) + + def test_float(self): + self.assertEqual(float(False), 0.0) + self.assertIsNot(float(False), False) + self.assertEqual(float(True), 1.0) + self.assertIsNot(float(True), True) + + def test_math(self): + self.assertEqual(+False, 0) + self.assertIsNot(+False, False) + self.assertEqual(-False, 0) + self.assertIsNot(-False, False) + self.assertEqual(abs(False), 0) + self.assertIsNot(abs(False), False) + self.assertEqual(+True, 1) + self.assertIsNot(+True, True) + self.assertEqual(-True, -1) + self.assertEqual(abs(True), 1) + self.assertIsNot(abs(True), True) + self.assertEqual(~False, -1) + self.assertEqual(~True, -2) + + self.assertEqual(False+2, 2) + self.assertEqual(True+2, 3) + self.assertEqual(2+False, 2) + self.assertEqual(2+True, 3) + + self.assertEqual(False+False, 0) + self.assertIsNot(False+False, False) + self.assertEqual(False+True, 1) + self.assertIsNot(False+True, True) + self.assertEqual(True+False, 1) + self.assertIsNot(True+False, True) + self.assertEqual(True+True, 2) + + self.assertEqual(True-True, 0) + self.assertIsNot(True-True, False) + self.assertEqual(False-False, 0) + self.assertIsNot(False-False, False) + self.assertEqual(True-False, 1) + self.assertIsNot(True-False, True) + self.assertEqual(False-True, -1) + + self.assertEqual(True*1, 1) + self.assertEqual(False*1, 0) + self.assertIsNot(False*1, False) + + self.assertEqual(True/1, 1) + self.assertIsNot(True/1, True) + self.assertEqual(False/1, 0) + self.assertIsNot(False/1, False) + + self.assertEqual(True%1, 0) + self.assertIsNot(True%1, False) + self.assertEqual(True%2, 1) + self.assertIsNot(True%2, True) + self.assertEqual(False%1, 0) + self.assertIsNot(False%1, False) + + for b in False, True: + for i in 0, 1, 2: + self.assertEqual(b**i, int(b)**i) + self.assertIsNot(b**i, bool(int(b)**i)) + + for a in False, True: + for b in False, True: + self.assertIs(a&b, bool(int(a)&int(b))) + self.assertIs(a|b, bool(int(a)|int(b))) + self.assertIs(a^b, bool(int(a)^int(b))) + self.assertEqual(a&int(b), int(a)&int(b)) + self.assertIsNot(a&int(b), bool(int(a)&int(b))) + self.assertEqual(a|int(b), int(a)|int(b)) + self.assertIsNot(a|int(b), bool(int(a)|int(b))) + self.assertEqual(a^int(b), int(a)^int(b)) + self.assertIsNot(a^int(b), bool(int(a)^int(b))) + self.assertEqual(int(a)&b, int(a)&int(b)) + self.assertIsNot(int(a)&b, bool(int(a)&int(b))) + self.assertEqual(int(a)|b, int(a)|int(b)) + self.assertIsNot(int(a)|b, bool(int(a)|int(b))) + self.assertEqual(int(a)^b, int(a)^int(b)) + self.assertIsNot(int(a)^b, bool(int(a)^int(b))) + + self.assertIs(1==1, True) + self.assertIs(1==0, False) + self.assertIs(0<1, True) + self.assertIs(1<0, False) + self.assertIs(0<=0, True) + self.assertIs(1<=0, False) + self.assertIs(1>0, True) + self.assertIs(1>1, False) + self.assertIs(1>=1, True) + self.assertIs(0>=1, False) + self.assertIs(0!=1, True) + self.assertIs(0!=0, False) + + x = [1] + self.assertIs(x is x, True) + self.assertIs(x is not x, False) + + self.assertIs(1 in x, True) + self.assertIs(0 in x, False) + self.assertIs(1 not in x, False) + self.assertIs(0 not in x, True) + + x = {1: 2} + self.assertIs(x is x, True) + self.assertIs(x is not x, False) + + self.assertIs(1 in x, True) + self.assertIs(0 in x, False) + self.assertIs(1 not in x, False) + self.assertIs(0 not in x, True) + + self.assertIs(not True, False) + self.assertIs(not False, True) + + def test_convert(self): + self.assertRaises(TypeError, bool, 42, 42) + self.assertIs(bool(10), True) + self.assertIs(bool(1), True) + self.assertIs(bool(-1), True) + self.assertIs(bool(0), False) + self.assertIs(bool("hello"), True) + self.assertIs(bool(""), False) + self.assertIs(bool(), False) + + def test_keyword_args(self): + with self.assertRaisesRegex(TypeError, 'keyword argument'): + bool(x=10) + + def test_format(self): + self.assertEqual("%d" % False, "0") + self.assertEqual("%d" % True, "1") + self.assertEqual("%x" % False, "0") + self.assertEqual("%x" % True, "1") + + def test_hasattr(self): + self.assertIs(hasattr([], "append"), True) + self.assertIs(hasattr([], "wobble"), False) + + def test_callable(self): + self.assertIs(callable(len), True) + self.assertIs(callable(1), False) + + def test_isinstance(self): + self.assertIs(isinstance(True, bool), True) + self.assertIs(isinstance(False, bool), True) + self.assertIs(isinstance(True, int), True) + self.assertIs(isinstance(False, int), True) + self.assertIs(isinstance(1, bool), False) + self.assertIs(isinstance(0, bool), False) + + def test_issubclass(self): + self.assertIs(issubclass(bool, int), True) + self.assertIs(issubclass(int, bool), False) + + def test_contains(self): + self.assertIs(1 in {}, False) + self.assertIs(1 in {1:1}, True) + + def test_string(self): + self.assertIs("xyz".endswith("z"), True) + self.assertIs("xyz".endswith("x"), False) + self.assertIs("xyz0123".isalnum(), True) + self.assertIs("@#$%".isalnum(), False) + self.assertIs("xyz".isalpha(), True) + self.assertIs("@#$%".isalpha(), False) + self.assertIs("0123".isdigit(), True) + self.assertIs("xyz".isdigit(), False) + self.assertIs("xyz".islower(), True) + self.assertIs("XYZ".islower(), False) + self.assertIs("0123".isdecimal(), True) + self.assertIs("xyz".isdecimal(), False) + self.assertIs("0123".isnumeric(), True) + self.assertIs("xyz".isnumeric(), False) + self.assertIs(" ".isspace(), True) + self.assertIs("\xa0".isspace(), True) + self.assertIs("\u3000".isspace(), True) + self.assertIs("XYZ".isspace(), False) + self.assertIs("X".istitle(), True) + self.assertIs("x".istitle(), False) + self.assertIs("XYZ".isupper(), True) + self.assertIs("xyz".isupper(), False) + self.assertIs("xyz".startswith("x"), True) + self.assertIs("xyz".startswith("z"), False) + + def test_boolean(self): + self.assertEqual(True & 1, 1) + self.assertNotIsInstance(True & 1, bool) + self.assertIs(True & True, True) + + self.assertEqual(True | 1, 1) + self.assertNotIsInstance(True | 1, bool) + self.assertIs(True | True, True) + + self.assertEqual(True ^ 1, 0) + self.assertNotIsInstance(True ^ 1, bool) + self.assertIs(True ^ True, False) + + def test_fileclosed(self): + try: + with open(support.TESTFN, "w") as f: + self.assertIs(f.closed, False) + self.assertIs(f.closed, True) + finally: + os.remove(support.TESTFN) + + def test_types(self): + # types are always true. + for t in [bool, complex, dict, float, int, list, object, + set, str, tuple, type]: + self.assertIs(bool(t), True) + + def test_operator(self): + import operator + self.assertIs(operator.truth(0), False) + self.assertIs(operator.truth(1), True) + self.assertIs(operator.not_(1), False) + self.assertIs(operator.not_(0), True) + self.assertIs(operator.contains([], 1), False) + self.assertIs(operator.contains([1], 1), True) + self.assertIs(operator.lt(0, 0), False) + self.assertIs(operator.lt(0, 1), True) + self.assertIs(operator.is_(True, True), True) + self.assertIs(operator.is_(True, False), False) + self.assertIs(operator.is_not(True, True), False) + self.assertIs(operator.is_not(True, False), True) + + def test_marshal(self): + import marshal + self.assertIs(marshal.loads(marshal.dumps(True)), True) + self.assertIs(marshal.loads(marshal.dumps(False)), False) + + def test_pickle(self): + import pickle + for proto in range(pickle.HIGHEST_PROTOCOL + 1): + self.assertIs(pickle.loads(pickle.dumps(True, proto)), True) + self.assertIs(pickle.loads(pickle.dumps(False, proto)), False) + + def test_picklevalues(self): + # Test for specific backwards-compatible pickle values + import pickle + self.assertEqual(pickle.dumps(True, protocol=0), b"I01\n.") + self.assertEqual(pickle.dumps(False, protocol=0), b"I00\n.") + self.assertEqual(pickle.dumps(True, protocol=1), b"I01\n.") + self.assertEqual(pickle.dumps(False, protocol=1), b"I00\n.") + self.assertEqual(pickle.dumps(True, protocol=2), b'\x80\x02\x88.') + self.assertEqual(pickle.dumps(False, protocol=2), b'\x80\x02\x89.') + + def test_convert_to_bool(self): + # Verify that TypeError occurs when bad things are returned + # from __bool__(). This isn't really a bool test, but + # it's related. + check = lambda o: self.assertRaises(TypeError, bool, o) + class Foo(object): + def __bool__(self): + return self + check(Foo()) + + class Bar(object): + def __bool__(self): + return "Yes" + check(Bar()) + + class Baz(int): + def __bool__(self): + return self + check(Baz()) + + # __bool__() must return a bool not an int + class Spam(int): + def __bool__(self): + return 1 + check(Spam()) + + class Eggs: + def __len__(self): + return -1 + self.assertRaises(ValueError, bool, Eggs()) + + def test_from_bytes(self): + self.assertIs(bool.from_bytes(b'\x00'*8, 'big'), False) + self.assertIs(bool.from_bytes(b'abcd', 'little'), True) + + def test_sane_len(self): + # this test just tests our assumptions about __len__ + # this will start failing if __len__ changes assertions + for badval in ['illegal', -1, 1 << 32]: + class A: + def __len__(self): + return badval + try: + bool(A()) + except (Exception) as e_bool: + try: + len(A()) + except (Exception) as e_len: + self.assertEqual(str(e_bool), str(e_len)) + + def test_blocked(self): + class A: + __bool__ = None + self.assertRaises(TypeError, bool, A()) + + class B: + def __len__(self): + return 10 + __bool__ = None + self.assertRaises(TypeError, bool, B()) + + def test_real_and_imag(self): + self.assertEqual(True.real, 1) + self.assertEqual(True.imag, 0) + self.assertIs(type(True.real), int) + self.assertIs(type(True.imag), int) + self.assertEqual(False.real, 0) + self.assertEqual(False.imag, 0) + self.assertIs(type(False.real), int) + self.assertIs(type(False.imag), int) + +def test_main(): + support.run_unittest(BoolTest) + +if __name__ == "__main__": + test_main() From adeb00ffbc7bf3ee39f200be0df0e745d0e4b8ad Mon Sep 17 00:00:00 2001 From: Aviv Palivoda Date: Sat, 2 Nov 2019 14:54:31 +0200 Subject: [PATCH 2/2] Allow test_bool.py to run --- Lib/test/support/__init__.py | 206 ++++++++++++++++----------------- Lib/test/support/testresult.py | 6 +- 2 files changed, 107 insertions(+), 105 deletions(-) diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py index 9e60d960ab..8a503f98e9 100644 --- a/Lib/test/support/__init__.py +++ b/Lib/test/support/__init__.py @@ -3,20 +3,20 @@ if __name__ != 'test.support': raise ImportError('support must be imported from the test package') -import asyncio.events +# import asyncio.events import collections.abc import contextlib import datetime import errno -import faulthandler +# import faulthandler import fnmatch import functools -import gc +# import gc import importlib import importlib.util import io -import logging.handlers -import nntplib +# import logging.handlers +# import nntplib import os import platform import re @@ -27,13 +27,13 @@ import subprocess import sys import sysconfig -import tempfile +# import tempfile import _thread -import threading +# import threading import time import types import unittest -import urllib.error +# import urllib.error import warnings from .testresult import get_test_runner @@ -757,22 +757,22 @@ def bind_unix_socket(sock, addr): sock.close() raise unittest.SkipTest('cannot bind AF_UNIX sockets') -def _is_ipv6_enabled(): - """Check whether IPv6 is enabled on this host.""" - if socket.has_ipv6: - sock = None - try: - sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) - sock.bind((HOSTv6, 0)) - return True - except OSError: - pass - finally: - if sock: - sock.close() - return False - -IPV6_ENABLED = _is_ipv6_enabled() +# def _is_ipv6_enabled(): +# """Check whether IPv6 is enabled on this host.""" +# if socket.has_ipv6: +# sock = None +# try: +# sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) +# sock.bind((HOSTv6, 0)) +# return True +# except OSError: +# pass +# finally: +# if sock: +# sock.close() +# return False + +# IPV6_ENABLED = _is_ipv6_enabled() def system_must_validate_cert(f): """Skip the test on TLS certificate validation failures.""" @@ -801,10 +801,10 @@ def dec(*args, **kwargs): # for a discussion of this number). SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1 -# decorator for skipping tests on non-IEEE 754 platforms -requires_IEEE_754 = unittest.skipUnless( - float.__getformat__("double").startswith("IEEE"), - "test requires IEEE 754 doubles") +# # decorator for skipping tests on non-IEEE 754 platforms +# requires_IEEE_754 = unittest.skipUnless( +# float.__getformat__("double").startswith("IEEE"), +# "test requires IEEE 754 doubles") requires_zlib = unittest.skipUnless(zlib, 'requires zlib') @@ -929,35 +929,35 @@ def dec(*args, **kwargs): # the byte 0xff. Skip some unicode filename tests. pass -# TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be -# decoded from the filesystem encoding (in strict mode). It can be None if we -# cannot generate such filename (ex: the latin1 encoding can decode any byte -# sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks -# to the surrogateescape error handler (PEP 383), but not from the filesystem -# encoding in strict mode. -TESTFN_UNDECODABLE = None -for name in ( - # b'\xff' is not decodable by os.fsdecode() with code page 932. Windows - # accepts it to create a file or a directory, or don't accept to enter to - # such directory (when the bytes name is used). So test b'\xe7' first: it is - # not decodable from cp932. - b'\xe7w\xf0', - # undecodable from ASCII, UTF-8 - b'\xff', - # undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856 - # and cp857 - b'\xae\xd5' - # undecodable from UTF-8 (UNIX and Mac OS X) - b'\xed\xb2\x80', b'\xed\xb4\x80', - # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252, - # cp1253, cp1254, cp1255, cp1257, cp1258 - b'\x81\x98', -): - try: - name.decode(TESTFN_ENCODING) - except UnicodeDecodeError: - TESTFN_UNDECODABLE = os.fsencode(TESTFN) + name - break +# # TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be +# # decoded from the filesystem encoding (in strict mode). It can be None if we +# # cannot generate such filename (ex: the latin1 encoding can decode any byte +# # sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks +# # to the surrogateescape error handler (PEP 383), but not from the filesystem +# # encoding in strict mode. +# TESTFN_UNDECODABLE = None +# for name in ( +# # b'\xff' is not decodable by os.fsdecode() with code page 932. Windows +# # accepts it to create a file or a directory, or don't accept to enter to +# # such directory (when the bytes name is used). So test b'\xe7' first: it is +# # not decodable from cp932. +# b'\xe7w\xf0', +# # undecodable from ASCII, UTF-8 +# b'\xff', +# # undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856 +# # and cp857 +# b'\xae\xd5' +# # undecodable from UTF-8 (UNIX and Mac OS X) +# b'\xed\xb2\x80', b'\xed\xb4\x80', +# # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252, +# # cp1253, cp1254, cp1255, cp1257, cp1258 +# b'\x81\x98', +# ): +# try: +# name.decode(TESTFN_ENCODING) +# except UnicodeDecodeError: +# TESTFN_UNDECODABLE = os.fsencode(TESTFN) + name +# break if FS_NONASCII: TESTFN_NONASCII = TESTFN + '-' + FS_NONASCII @@ -2414,33 +2414,33 @@ def optim_args_from_interpreter_flags(): # Support for assertions about logging. #============================================================ -class TestHandler(logging.handlers.BufferingHandler): - def __init__(self, matcher): - # BufferingHandler takes a "capacity" argument - # so as to know when to flush. As we're overriding - # shouldFlush anyway, we can set a capacity of zero. - # You can call flush() manually to clear out the - # buffer. - logging.handlers.BufferingHandler.__init__(self, 0) - self.matcher = matcher - - def shouldFlush(self): - return False - - def emit(self, record): - self.format(record) - self.buffer.append(record.__dict__) - - def matches(self, **kwargs): - """ - Look for a saved dict whose keys/values match the supplied arguments. - """ - result = False - for d in self.buffer: - if self.matcher.matches(d, **kwargs): - result = True - break - return result +# class TestHandler(logging.handlers.BufferingHandler): +# def __init__(self, matcher): +# # BufferingHandler takes a "capacity" argument +# # so as to know when to flush. As we're overriding +# # shouldFlush anyway, we can set a capacity of zero. +# # You can call flush() manually to clear out the +# # buffer. +# logging.handlers.BufferingHandler.__init__(self, 0) +# self.matcher = matcher + +# def shouldFlush(self): +# return False + +# def emit(self, record): +# self.format(record) +# self.buffer.append(record.__dict__) + +# def matches(self, **kwargs): +# """ +# Look for a saved dict whose keys/values match the supplied arguments. +# """ +# result = False +# for d in self.buffer: +# if self.matcher.matches(d, **kwargs): +# result = True +# break +# return result class Matcher(object): @@ -2794,23 +2794,23 @@ def run_in_subinterp(code): return _testcapi.run_in_subinterp(code) -def check_free_after_iterating(test, iter, cls, args=()): - class A(cls): - def __del__(self): - nonlocal done - done = True - try: - next(it) - except StopIteration: - pass +# def check_free_after_iterating(test, iter, cls, args=()): +# class A(cls): +# def __del__(self): +# nonlocal done +# done = True +# try: +# next(it) +# except StopIteration: +# pass - done = False - it = iter(A(*args)) - # Issue 26494: Shouldn't crash - test.assertRaises(StopIteration, next, it) - # The sequence should be deallocated just after the end of iterating - gc_collect() - test.assertTrue(done) +# done = False +# it = iter(A(*args)) +# # Issue 26494: Shouldn't crash +# test.assertRaises(StopIteration, next, it) +# # The sequence should be deallocated just after the end of iterating +# gc_collect() +# test.assertTrue(done) def missing_compiler_executable(cmd_names=[]): @@ -2994,9 +2994,9 @@ def maybe_get_event_loop_policy(): """Return the global event loop policy if one is set, else return None.""" return asyncio.events._event_loop_policy -# Helpers for testing hashing. -NHASHBITS = sys.hash_info.width # number of bits in hash() result -assert NHASHBITS in (32, 64) +# # Helpers for testing hashing. +# NHASHBITS = sys.hash_info.width # number of bits in hash() result +# assert NHASHBITS in (32, 64) # Return mean and sdev of number of collisions when tossing nballs balls # uniformly at random into nbins bins. By definition, the number of diff --git a/Lib/test/support/testresult.py b/Lib/test/support/testresult.py index 67e126dcf7..4d8c99e228 100644 --- a/Lib/test/support/testresult.py +++ b/Lib/test/support/testresult.py @@ -43,7 +43,8 @@ def __getId(cls, test): def startTest(self, test): super().startTest(test) self.__e = e = ET.SubElement(self.__suite, 'testcase') - self.__start_time = time.perf_counter() + # self.__start_time = time.perf_counter() + self.__start_time = time.time() if self.__verbose: self.stream.write(f'{self.getDescription(test)} ... ') self.stream.flush() @@ -57,7 +58,8 @@ def _add_result(self, test, capture=False, **args): e.set('status', args.pop('status', 'run')) e.set('result', args.pop('result', 'completed')) if self.__start_time: - e.set('time', f'{time.perf_counter() - self.__start_time:0.6f}') + # e.set('time', f'{time.perf_counter() - self.__start_time:0.6f}') + e.set('time', f'{time.time() - self.__start_time}') if capture: if self._stdout_buffer is not None: