From 40b96bfffd4f7c1154b9ea50d89243528b8f96e3 Mon Sep 17 00:00:00 2001 From: CPython Developers <> Date: Fri, 24 Feb 2023 14:23:50 +0900 Subject: [PATCH 1/2] Update unittest from CPython 3.11 --- Lib/unittest/__init__.py | 11 +- Lib/unittest/async_case.py | 148 ++++------ Lib/unittest/case.py | 117 +++++--- Lib/unittest/loader.py | 77 +++--- Lib/unittest/main.py | 3 + Lib/unittest/mock.py | 146 ++++++---- Lib/unittest/result.py | 4 +- Lib/unittest/runner.py | 9 + Lib/unittest/test/__init__.py | 5 +- Lib/unittest/test/test_async_case.py | 198 ++++++++++++-- Lib/unittest/test/test_break.py | 147 +++++----- Lib/unittest/test/test_case.py | 65 +++-- Lib/unittest/test/test_discovery.py | 35 +-- Lib/unittest/test/test_functiontestcase.py | 8 +- Lib/unittest/test/test_loader.py | 47 ++++ Lib/unittest/test/test_program.py | 45 ++- Lib/unittest/test/test_result.py | 301 +++++++++++++++++---- Lib/unittest/test/test_runner.py | 185 ++++++++++++- Lib/unittest/test/test_skipping.py | 2 +- Lib/unittest/test/test_suite.py | 3 +- Lib/unittest/test/testmock/testasync.py | 38 ++- Lib/unittest/test/testmock/testmock.py | 24 ++ Lib/unittest/test/testmock/testwith.py | 4 +- 23 files changed, 1146 insertions(+), 476 deletions(-) diff --git a/Lib/unittest/__init__.py b/Lib/unittest/__init__.py index 348dc471f4..005d23f6d0 100644 --- a/Lib/unittest/__init__.py +++ b/Lib/unittest/__init__.py @@ -49,27 +49,30 @@ def testMultiply(self): 'defaultTestLoader', 'SkipTest', 'skip', 'skipIf', 'skipUnless', 'expectedFailure', 'TextTestResult', 'installHandler', 'registerResult', 'removeResult', 'removeHandler', - 'addModuleCleanup'] + 'addModuleCleanup', 'doModuleCleanups', 'enterModuleContext'] # Expose obsolete functions for backwards compatibility +# bpo-5846: Deprecated in Python 3.11, scheduled for removal in Python 3.13. __all__.extend(['getTestCaseNames', 'makeSuite', 'findTestCases']) __unittest = True from .result import TestResult from .case import (addModuleCleanup, TestCase, FunctionTestCase, SkipTest, skip, - skipIf, skipUnless, expectedFailure) + skipIf, skipUnless, expectedFailure, doModuleCleanups, + enterModuleContext) from .suite import BaseTestSuite, TestSuite -from .loader import (TestLoader, defaultTestLoader, makeSuite, getTestCaseNames, - findTestCases) +from .loader import TestLoader, defaultTestLoader from .main import TestProgram, main from .runner import TextTestRunner, TextTestResult from .signals import installHandler, registerResult, removeResult, removeHandler # IsolatedAsyncioTestCase will be imported lazily. +from .loader import makeSuite, getTestCaseNames, findTestCases # deprecated _TextTestResult = TextTestResult + # There are no tests here, so don't try to run anything discovered from # introspecting the symbols (e.g. FunctionTestCase). Instead, all our # tests come from within unittest.test. diff --git a/Lib/unittest/async_case.py b/Lib/unittest/async_case.py index d9c694e368..bd2a471156 100644 --- a/Lib/unittest/async_case.py +++ b/Lib/unittest/async_case.py @@ -1,5 +1,7 @@ import asyncio +import contextvars import inspect +import warnings from .case import TestCase @@ -32,8 +34,8 @@ class IsolatedAsyncioTestCase(TestCase): def __init__(self, methodName='runTest'): super().__init__(methodName) - self._asyncioTestLoop = None - self._asyncioCallsQueue = None + self._asyncioRunner = None + self._asyncioTestContext = contextvars.copy_context() async def asyncSetUp(self): pass @@ -56,115 +58,85 @@ def addAsyncCleanup(self, func, /, *args, **kwargs): # 3. Regular "def func()" that returns awaitable object self.addCleanup(*(func, *args), **kwargs) + async def enterAsyncContext(self, cm): + """Enters the supplied asynchronous context manager. + + If successful, also adds its __aexit__ method as a cleanup + function and returns the result of the __aenter__ method. + """ + # We look up the special methods on the type to match the with + # statement. + cls = type(cm) + try: + enter = cls.__aenter__ + exit = cls.__aexit__ + except AttributeError: + raise TypeError(f"'{cls.__module__}.{cls.__qualname__}' object does " + f"not support the asynchronous context manager protocol" + ) from None + result = await enter(cm) + self.addAsyncCleanup(exit, cm, None, None, None) + return result + def _callSetUp(self): - self.setUp() + # Force loop to be initialized and set as the current loop + # so that setUp functions can use get_event_loop() and get the + # correct loop instance. + self._asyncioRunner.get_loop() + self._asyncioTestContext.run(self.setUp) self._callAsync(self.asyncSetUp) def _callTestMethod(self, method): - self._callMaybeAsync(method) + if self._callMaybeAsync(method) is not None: + warnings.warn(f'It is deprecated to return a value that is not None from a ' + f'test case ({method})', DeprecationWarning, stacklevel=4) def _callTearDown(self): self._callAsync(self.asyncTearDown) - self.tearDown() + self._asyncioTestContext.run(self.tearDown) def _callCleanup(self, function, *args, **kwargs): self._callMaybeAsync(function, *args, **kwargs) def _callAsync(self, func, /, *args, **kwargs): - assert self._asyncioTestLoop is not None, 'asyncio test loop is not initialized' - ret = func(*args, **kwargs) - assert inspect.isawaitable(ret), f'{func!r} returned non-awaitable' - fut = self._asyncioTestLoop.create_future() - self._asyncioCallsQueue.put_nowait((fut, ret)) - return self._asyncioTestLoop.run_until_complete(fut) + assert self._asyncioRunner is not None, 'asyncio runner is not initialized' + assert inspect.iscoroutinefunction(func), f'{func!r} is not an async function' + return self._asyncioRunner.run( + func(*args, **kwargs), + context=self._asyncioTestContext + ) def _callMaybeAsync(self, func, /, *args, **kwargs): - assert self._asyncioTestLoop is not None, 'asyncio test loop is not initialized' - ret = func(*args, **kwargs) - if inspect.isawaitable(ret): - fut = self._asyncioTestLoop.create_future() - self._asyncioCallsQueue.put_nowait((fut, ret)) - return self._asyncioTestLoop.run_until_complete(fut) + assert self._asyncioRunner is not None, 'asyncio runner is not initialized' + if inspect.iscoroutinefunction(func): + return self._asyncioRunner.run( + func(*args, **kwargs), + context=self._asyncioTestContext, + ) else: - return ret - - async def _asyncioLoopRunner(self, fut): - self._asyncioCallsQueue = queue = asyncio.Queue() - fut.set_result(None) - while True: - query = await queue.get() - queue.task_done() - if query is None: - return - fut, awaitable = query - try: - ret = await awaitable - if not fut.cancelled(): - fut.set_result(ret) - except (SystemExit, KeyboardInterrupt): - raise - except (BaseException, asyncio.CancelledError) as ex: - if not fut.cancelled(): - fut.set_exception(ex) - - def _setupAsyncioLoop(self): - assert self._asyncioTestLoop is None, 'asyncio test loop already initialized' - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - loop.set_debug(True) - self._asyncioTestLoop = loop - fut = loop.create_future() - self._asyncioCallsTask = loop.create_task(self._asyncioLoopRunner(fut)) - loop.run_until_complete(fut) - - def _tearDownAsyncioLoop(self): - assert self._asyncioTestLoop is not None, 'asyncio test loop is not initialized' - loop = self._asyncioTestLoop - self._asyncioTestLoop = None - self._asyncioCallsQueue.put_nowait(None) - loop.run_until_complete(self._asyncioCallsQueue.join()) + return self._asyncioTestContext.run(func, *args, **kwargs) - try: - # cancel all tasks - to_cancel = asyncio.all_tasks(loop) - if not to_cancel: - return - - for task in to_cancel: - task.cancel() - - loop.run_until_complete( - asyncio.gather(*to_cancel, return_exceptions=True)) - - for task in to_cancel: - if task.cancelled(): - continue - if task.exception() is not None: - loop.call_exception_handler({ - 'message': 'unhandled exception during test shutdown', - 'exception': task.exception(), - 'task': task, - }) - # shutdown asyncgens - loop.run_until_complete(loop.shutdown_asyncgens()) - finally: - # Prevent our executor environment from leaking to future tests. - loop.run_until_complete(loop.shutdown_default_executor()) - asyncio.set_event_loop(None) - loop.close() + def _setupAsyncioRunner(self): + assert self._asyncioRunner is None, 'asyncio runner is already initialized' + runner = asyncio.Runner(debug=True) + self._asyncioRunner = runner + + def _tearDownAsyncioRunner(self): + runner = self._asyncioRunner + runner.close() def run(self, result=None): - self._setupAsyncioLoop() + self._setupAsyncioRunner() try: return super().run(result) finally: - self._tearDownAsyncioLoop() + self._tearDownAsyncioRunner() def debug(self): - self._setupAsyncioLoop() + self._setupAsyncioRunner() super().debug() - self._tearDownAsyncioLoop() + self._tearDownAsyncioRunner() def __del__(self): - if self._asyncioTestLoop is not None: - self._tearDownAsyncioLoop() + if self._asyncioRunner is not None: + self._tearDownAsyncioRunner() diff --git a/Lib/unittest/case.py b/Lib/unittest/case.py index 61003d0c6e..c4aa2d7721 100644 --- a/Lib/unittest/case.py +++ b/Lib/unittest/case.py @@ -47,12 +47,10 @@ def __init__(self, result=None): self.result = result self.result_supports_subtests = hasattr(result, "addSubTest") self.success = True - self.skipped = [] self.expectedFailure = None - self.errors = [] @contextlib.contextmanager - def testPartExecutor(self, test_case, isTest=False): + def testPartExecutor(self, test_case, subTest=False): old_success = self.success self.success = True try: @@ -61,7 +59,7 @@ def testPartExecutor(self, test_case, isTest=False): raise except SkipTest as e: self.success = False - self.skipped.append((test_case, str(e))) + _addSkip(self.result, test_case, str(e)) except _ShouldStop: pass except: @@ -70,27 +68,65 @@ def testPartExecutor(self, test_case, isTest=False): self.expectedFailure = exc_info else: self.success = False - self.errors.append((test_case, exc_info)) + if subTest: + self.result.addSubTest(test_case.test_case, test_case, exc_info) + else: + _addError(self.result, test_case, exc_info) # explicitly break a reference cycle: # exc_info -> frame -> exc_info exc_info = None else: - if self.result_supports_subtests and self.success: - self.errors.append((test_case, None)) + if subTest and self.success: + self.result.addSubTest(test_case.test_case, test_case, None) finally: self.success = self.success and old_success +def _addSkip(result, test_case, reason): + addSkip = getattr(result, 'addSkip', None) + if addSkip is not None: + addSkip(test_case, reason) + else: + warnings.warn("TestResult has no addSkip method, skips not reported", + RuntimeWarning, 2) + result.addSuccess(test_case) + +def _addError(result, test, exc_info): + if result is not None and exc_info is not None: + if issubclass(exc_info[0], test.failureException): + result.addFailure(test, exc_info) + else: + result.addError(test, exc_info) + def _id(obj): return obj +def _enter_context(cm, addcleanup): + # We look up the special methods on the type to match the with + # statement. + cls = type(cm) + try: + enter = cls.__enter__ + exit = cls.__exit__ + except AttributeError: + raise TypeError(f"'{cls.__module__}.{cls.__qualname__}' object does " + f"not support the context manager protocol") from None + result = enter(cm) + addcleanup(exit, cm, None, None, None) + return result + + _module_cleanups = [] def addModuleCleanup(function, /, *args, **kwargs): """Same as addCleanup, except the cleanup items are called even if setUpModule fails (unlike tearDownModule).""" _module_cleanups.append((function, args, kwargs)) +def enterModuleContext(cm): + """Same as enterContext, but module-wide.""" + return _enter_context(cm, addModuleCleanup) + def doModuleCleanups(): """Execute all module cleanup functions. Normally called for you after @@ -348,11 +384,11 @@ class TestCase(object): # of difflib. See #11763. _diffThreshold = 2**16 - # Attribute used by TestSuite for classSetUp - - _classSetupFailed = False - - _class_cleanups = [] + def __init_subclass__(cls, *args, **kwargs): + # Attribute used by TestSuite for classSetUp + cls._classSetupFailed = False + cls._class_cleanups = [] + super().__init_subclass__(*args, **kwargs) def __init__(self, methodName='runTest'): """Create an instance of the class that will use the named test @@ -409,12 +445,25 @@ def addCleanup(self, function, /, *args, **kwargs): Cleanup items are called even if setUp fails (unlike tearDown).""" self._cleanups.append((function, args, kwargs)) + def enterContext(self, cm): + """Enters the supplied context manager. + + If successful, also adds its __exit__ method as a cleanup + function and returns the result of the __enter__ method. + """ + return _enter_context(cm, self.addCleanup) + @classmethod def addClassCleanup(cls, function, /, *args, **kwargs): """Same as addCleanup, except the cleanup items are called even if setUpClass fails (unlike tearDownClass).""" cls._class_cleanups.append((function, args, kwargs)) + @classmethod + def enterClassContext(cls, cm): + """Same as enterContext, but class-wide.""" + return _enter_context(cm, cls.addClassCleanup) + def setUp(self): "Hook method for setting up the test fixture before exercising it." pass @@ -461,21 +510,12 @@ def __hash__(self): return hash((type(self), self._testMethodName)) def __str__(self): - return "%s (%s)" % (self._testMethodName, strclass(self.__class__)) + return "%s (%s.%s)" % (self._testMethodName, strclass(self.__class__), self._testMethodName) def __repr__(self): return "<%s testMethod=%s>" % \ (strclass(self.__class__), self._testMethodName) - def _addSkip(self, result, test_case, reason): - addSkip = getattr(result, 'addSkip', None) - if addSkip is not None: - addSkip(test_case, reason) - else: - warnings.warn("TestResult has no addSkip method, skips not reported", - RuntimeWarning, 2) - result.addSuccess(test_case) - @contextlib.contextmanager def subTest(self, msg=_subtest_msg_sentinel, **params): """Return a context manager that will return the enclosed block @@ -494,7 +534,7 @@ def subTest(self, msg=_subtest_msg_sentinel, **params): params_map = parent.params.new_child(params) self._subtest = _SubTest(self, msg, params_map) try: - with self._outcome.testPartExecutor(self._subtest, isTest=True): + with self._outcome.testPartExecutor(self._subtest, subTest=True): yield if not self._outcome.success: result = self._outcome.result @@ -507,16 +547,6 @@ def subTest(self, msg=_subtest_msg_sentinel, **params): finally: self._subtest = parent - def _feedErrorsToResult(self, result, errors): - for test, exc_info in errors: - if isinstance(test, _SubTest): - result.addSubTest(test.test_case, test, exc_info) - elif exc_info is not None: - if issubclass(exc_info[0], self.failureException): - result.addFailure(test, exc_info) - else: - result.addError(test, exc_info) - def _addExpectedFailure(self, result, exc_info): try: addExpectedFailure = result.addExpectedFailure @@ -546,7 +576,9 @@ def _callSetUp(self): self.setUp() def _callTestMethod(self, method): - method() + if method() is not None: + warnings.warn(f'It is deprecated to return a value that is not None from a ' + f'test case ({method})', DeprecationWarning, stacklevel=3) def _callTearDown(self): self.tearDown() @@ -572,7 +604,7 @@ def run(self, result=None): # If the class or method was skipped. skip_why = (getattr(self.__class__, '__unittest_skip_why__', '') or getattr(testMethod, '__unittest_skip_why__', '')) - self._addSkip(result, self, skip_why) + _addSkip(result, self, skip_why) return result expecting_failure = ( @@ -587,16 +619,13 @@ def run(self, result=None): self._callSetUp() if outcome.success: outcome.expecting_failure = expecting_failure - with outcome.testPartExecutor(self, isTest=True): + with outcome.testPartExecutor(self): self._callTestMethod(testMethod) outcome.expecting_failure = False with outcome.testPartExecutor(self): self._callTearDown() - self.doCleanups() - for test, reason in outcome.skipped: - self._addSkip(result, test, reason) - self._feedErrorsToResult(result, outcome.errors) + if outcome.success: if expecting_failure: if outcome.expectedFailure: @@ -607,11 +636,10 @@ def run(self, result=None): result.addSuccess(self) return result finally: - # explicitly break reference cycles: - # outcome.errors -> frame -> outcome -> outcome.errors + # explicitly break reference cycle: # outcome.expectedFailure -> frame -> outcome -> outcome.expectedFailure - outcome.errors.clear() outcome.expectedFailure = None + outcome = None # clear the outcome, no more needed self._outcome = None @@ -1146,8 +1174,7 @@ def assertDictEqual(self, d1, d2, msg=None): def assertDictContainsSubset(self, subset, dictionary, msg=None): """Checks whether dictionary is a superset of subset.""" warnings.warn('assertDictContainsSubset is deprecated', - DeprecationWarning, - stacklevel=2) + DeprecationWarning) missing = [] mismatched = [] for key, value in subset.items(): diff --git a/Lib/unittest/loader.py b/Lib/unittest/loader.py index ba7105e1ad..7e6ce2f224 100644 --- a/Lib/unittest/loader.py +++ b/Lib/unittest/loader.py @@ -286,8 +286,6 @@ def discover(self, start_dir, pattern='test*.py', top_level_dir=None): self._top_level_dir = top_level_dir is_not_importable = False - is_namespace = False - tests = [] if os.path.isdir(os.path.abspath(start_dir)): start_dir = os.path.abspath(start_dir) if start_dir != top_level_dir: @@ -303,50 +301,25 @@ def discover(self, start_dir, pattern='test*.py', top_level_dir=None): top_part = start_dir.split('.')[0] try: start_dir = os.path.abspath( - os.path.dirname((the_module.__file__))) + os.path.dirname((the_module.__file__))) except AttributeError: - # look for namespace packages - try: - spec = the_module.__spec__ - except AttributeError: - spec = None - - if spec and spec.loader is None: - if spec.submodule_search_locations is not None: - is_namespace = True - - for path in the_module.__path__: - if (not set_implicit_top and - not path.startswith(top_level_dir)): - continue - self._top_level_dir = \ - (path.split(the_module.__name__ - .replace(".", os.path.sep))[0]) - tests.extend(self._find_tests(path, - pattern, - namespace=True)) - elif the_module.__name__ in sys.builtin_module_names: + if the_module.__name__ in sys.builtin_module_names: # builtin module raise TypeError('Can not use builtin modules ' 'as dotted module names') from None else: raise TypeError( - 'don\'t know how to discover from {!r}' - .format(the_module)) from None + f"don't know how to discover from {the_module!r}" + ) from None if set_implicit_top: - if not is_namespace: - self._top_level_dir = \ - self._get_directory_containing_module(top_part) - sys.path.remove(top_level_dir) - else: - sys.path.remove(top_level_dir) + self._top_level_dir = self._get_directory_containing_module(top_part) + sys.path.remove(top_level_dir) if is_not_importable: raise ImportError('Start directory is not importable: %r' % start_dir) - if not is_namespace: - tests = list(self._find_tests(start_dir, pattern)) + tests = list(self._find_tests(start_dir, pattern)) return self.suiteClass(tests) def _get_directory_containing_module(self, module_name): @@ -381,7 +354,7 @@ def _match_path(self, path, full_path, pattern): # override this method to use alternative matching strategy return fnmatch(path, pattern) - def _find_tests(self, start_dir, pattern, namespace=False): + def _find_tests(self, start_dir, pattern): """Used by discovery. Yields test suites it loads.""" # Handle the __init__ in this package name = self._get_name_from_path(start_dir) @@ -390,8 +363,7 @@ def _find_tests(self, start_dir, pattern, namespace=False): if name != '.' and name not in self._loading_packages: # name is in self._loading_packages while we have called into # loadTestsFromModule with name. - tests, should_recurse = self._find_test_path( - start_dir, pattern, namespace) + tests, should_recurse = self._find_test_path(start_dir, pattern) if tests is not None: yield tests if not should_recurse: @@ -402,8 +374,7 @@ def _find_tests(self, start_dir, pattern, namespace=False): paths = sorted(os.listdir(start_dir)) for path in paths: full_path = os.path.join(start_dir, path) - tests, should_recurse = self._find_test_path( - full_path, pattern, namespace) + tests, should_recurse = self._find_test_path(full_path, pattern) if tests is not None: yield tests if should_recurse: @@ -411,11 +382,11 @@ def _find_tests(self, start_dir, pattern, namespace=False): name = self._get_name_from_path(full_path) self._loading_packages.add(name) try: - yield from self._find_tests(full_path, pattern, namespace) + yield from self._find_tests(full_path, pattern) finally: self._loading_packages.discard(name) - def _find_test_path(self, full_path, pattern, namespace=False): + def _find_test_path(self, full_path, pattern): """Used by discovery. Loads tests from a single file, or a directories' __init__.py when @@ -459,8 +430,7 @@ def _find_test_path(self, full_path, pattern, namespace=False): msg % (mod_name, module_dir, expected_dir)) return self.loadTestsFromModule(module, pattern=pattern), False elif os.path.isdir(full_path): - if (not namespace and - not os.path.isfile(os.path.join(full_path, '__init__.py'))): + if not os.path.isfile(os.path.join(full_path, '__init__.py')): return None, False load_tests = None @@ -494,6 +464,9 @@ def _find_test_path(self, full_path, pattern, namespace=False): defaultTestLoader = TestLoader() +# These functions are considered obsolete for long time. +# They will be removed in Python 3.13. + def _makeLoader(prefix, sortUsing, suiteClass=None, testNamePatterns=None): loader = TestLoader() loader.sortTestMethodsUsing = sortUsing @@ -504,14 +477,32 @@ def _makeLoader(prefix, sortUsing, suiteClass=None, testNamePatterns=None): return loader def getTestCaseNames(testCaseClass, prefix, sortUsing=util.three_way_cmp, testNamePatterns=None): + import warnings + warnings.warn( + "unittest.getTestCaseNames() is deprecated and will be removed in Python 3.13. " + "Please use unittest.TestLoader.getTestCaseNames() instead.", + DeprecationWarning, stacklevel=2 + ) return _makeLoader(prefix, sortUsing, testNamePatterns=testNamePatterns).getTestCaseNames(testCaseClass) def makeSuite(testCaseClass, prefix='test', sortUsing=util.three_way_cmp, suiteClass=suite.TestSuite): + import warnings + warnings.warn( + "unittest.makeSuite() is deprecated and will be removed in Python 3.13. " + "Please use unittest.TestLoader.loadTestsFromTestCase() instead.", + DeprecationWarning, stacklevel=2 + ) return _makeLoader(prefix, sortUsing, suiteClass).loadTestsFromTestCase( testCaseClass) def findTestCases(module, prefix='test', sortUsing=util.three_way_cmp, suiteClass=suite.TestSuite): + import warnings + warnings.warn( + "unittest.findTestCases() is deprecated and will be removed in Python 3.13. " + "Please use unittest.TestLoader.loadTestsFromModule() instead.", + DeprecationWarning, stacklevel=2 + ) return _makeLoader(prefix, sortUsing, suiteClass).loadTestsFromModule(\ module) diff --git a/Lib/unittest/main.py b/Lib/unittest/main.py index 88a188c545..046fbd3a45 100644 --- a/Lib/unittest/main.py +++ b/Lib/unittest/main.py @@ -3,6 +3,7 @@ import sys import argparse import os +import warnings from . import loader, runner from .signals import installHandler @@ -101,6 +102,8 @@ def __init__(self, module='__main__', defaultTest=None, argv=None, self.runTests() def usageExit(self, msg=None): + warnings.warn("TestProgram.usageExit() is deprecated and will be" + " removed in Python 3.13", DeprecationWarning) if msg: print(msg) if self._discovery_parser is None: diff --git a/Lib/unittest/mock.py b/Lib/unittest/mock.py index 7152f86ed9..fa0bd9131a 100644 --- a/Lib/unittest/mock.py +++ b/Lib/unittest/mock.py @@ -30,10 +30,12 @@ import pprint import sys import builtins +import pkgutil from asyncio import iscoroutinefunction from types import CodeType, ModuleType, MethodType from unittest.util import safe_repr from functools import wraps, partial +from threading import RLock class InvalidSpecError(Exception): @@ -401,6 +403,14 @@ def __init__(self, /, *args, **kwargs): class NonCallableMock(Base): """A non-callable version of `Mock`""" + # Store a mutex as a class attribute in order to protect concurrent access + # to mock attributes. Using a class attribute allows all NonCallableMock + # instances to share the mutex for simplicity. + # + # See https://github.com/python/cpython/issues/98624 for why this is + # necessary. + _lock = RLock() + def __new__(cls, /, *args, **kw): # every instance has its own class # so we can create magic methods on the @@ -488,6 +498,9 @@ def mock_add_spec(self, spec, spec_set=False): def _mock_add_spec(self, spec, spec_set, _spec_as_instance=False, _eat_self=False): + if _is_instance_mock(spec): + raise InvalidSpecError(f'Cannot spec a Mock object. [object={spec!r}]') + _spec_class = None _spec_signature = None _spec_asyncs = [] @@ -634,41 +647,42 @@ def __getattr__(self, name): raise AttributeError("Mock object has no attribute %r" % name) elif _is_magic(name): raise AttributeError(name) - if not self._mock_unsafe: + if not self._mock_unsafe and (not self._mock_methods or name not in self._mock_methods): if name.startswith(('assert', 'assret', 'asert', 'aseert', 'assrt')): raise AttributeError( f"{name!r} is not a valid assertion. Use a spec " f"for the mock if {name!r} is meant to be an attribute.") - result = self._mock_children.get(name) - if result is _deleted: - raise AttributeError(name) - elif result is None: - wraps = None - if self._mock_wraps is not None: - # XXXX should we get the attribute without triggering code - # execution? - wraps = getattr(self._mock_wraps, name) - - result = self._get_child_mock( - parent=self, name=name, wraps=wraps, _new_name=name, - _new_parent=self - ) - self._mock_children[name] = result - - elif isinstance(result, _SpecState): - try: - result = create_autospec( - result.spec, result.spec_set, result.instance, - result.parent, result.name + with NonCallableMock._lock: + result = self._mock_children.get(name) + if result is _deleted: + raise AttributeError(name) + elif result is None: + wraps = None + if self._mock_wraps is not None: + # XXXX should we get the attribute without triggering code + # execution? + wraps = getattr(self._mock_wraps, name) + + result = self._get_child_mock( + parent=self, name=name, wraps=wraps, _new_name=name, + _new_parent=self ) - except InvalidSpecError: - target_name = self.__dict__['_mock_name'] or self - raise InvalidSpecError( - f'Cannot autospec attr {name!r} from target ' - f'{target_name!r} as it has already been mocked out. ' - f'[target={self!r}, attr={result.spec!r}]') - self._mock_children[name] = result + self._mock_children[name] = result + + elif isinstance(result, _SpecState): + try: + result = create_autospec( + result.spec, result.spec_set, result.instance, + result.parent, result.name + ) + except InvalidSpecError: + target_name = self.__dict__['_mock_name'] or self + raise InvalidSpecError( + f'Cannot autospec attr {name!r} from target ' + f'{target_name!r} as it has already been mocked out. ' + f'[target={self!r}, attr={result.spec!r}]') + self._mock_children[name] = result return result @@ -1000,15 +1014,15 @@ def _get_child_mock(self, /, **kw): For non-callable mocks the callable variant will be used (rather than any custom subclass).""" - _new_name = kw.get("_new_name") - if _new_name in self.__dict__['_spec_asyncs']: - return AsyncMock(**kw) - if self._mock_sealed: attribute = f".{kw['name']}" if "name" in kw else "()" mock_name = self._extract_mock_name() + attribute raise AttributeError(mock_name) + _new_name = kw.get("_new_name") + if _new_name in self.__dict__['_spec_asyncs']: + return AsyncMock(**kw) + _type = type(self) if issubclass(_type, MagicMock) and _new_name in _async_method_magics: # Any asynchronous magic becomes an AsyncMock @@ -1214,6 +1228,11 @@ class or instance) that acts as the specification for the mock object. If this is a new Mock (created on first access). See the `return_value` attribute. + * `unsafe`: By default, accessing any attribute whose name starts with + *assert*, *assret*, *asert*, *aseert* or *assrt* will raise an + AttributeError. Passing `unsafe=True` will allow access to + these attributes. + * `wraps`: Item for the mock object to wrap. If `wraps` is not None then calling the Mock will pass the call through to the wrapped object (returning the real result). Attribute access on the mock will return a @@ -1233,25 +1252,6 @@ class or instance) that acts as the specification for the mock object. If """ -def _dot_lookup(thing, comp, import_path): - try: - return getattr(thing, comp) - except AttributeError: - __import__(import_path) - return getattr(thing, comp) - - -def _importer(target): - components = target.split('.') - import_path = components.pop(0) - thing = __import__(import_path) - - for comp in components: - import_path += ".%s" % comp - thing = _dot_lookup(thing, comp, import_path) - return thing - - # _check_spec_arg_typos takes kwargs from commands like patch and checks that # they don't contain common misspellings of arguments related to autospeccing. def _check_spec_arg_typos(kwargs_to_check): @@ -1605,8 +1605,7 @@ def _get_target(target): except (TypeError, ValueError, AttributeError): raise TypeError( f"Need a valid target to patch. You supplied: {target!r}") - getter = lambda: _importer(target) - return getter, attribute + return partial(pkgutil.resolve_name, target), attribute def _patch_object( @@ -1661,7 +1660,7 @@ def _patch_multiple(target, spec=None, create=False, spec_set=None, for choosing which methods to wrap. """ if type(target) is str: - getter = lambda: _importer(target) + getter = partial(pkgutil.resolve_name, target) else: getter = lambda: target @@ -1810,6 +1809,12 @@ def __init__(self, in_dict, values=(), clear=False, **kwargs): def __call__(self, f): if isinstance(f, type): return self.decorate_class(f) + if inspect.iscoroutinefunction(f): + return self.decorate_async_callable(f) + return self.decorate_callable(f) + + + def decorate_callable(self, f): @wraps(f) def _inner(*args, **kw): self._patch_dict() @@ -1821,6 +1826,18 @@ def _inner(*args, **kw): return _inner + def decorate_async_callable(self, f): + @wraps(f) + async def _inner(*args, **kw): + self._patch_dict() + try: + return await f(*args, **kw) + finally: + self._unpatch_dict() + + return _inner + + def decorate_class(self, klass): for attr in dir(klass): attr_value = getattr(klass, attr) @@ -1841,7 +1858,7 @@ def __enter__(self): def _patch_dict(self): values = self.values if isinstance(self.in_dict, str): - self.in_dict = _importer(self.in_dict) + self.in_dict = pkgutil.resolve_name(self.in_dict) in_dict = self.in_dict clear = self.clear @@ -1942,7 +1959,7 @@ def _patch_stopall(): ) numerics = ( - "add sub mul matmul div floordiv mod lshift rshift and xor or pow truediv" + "add sub mul matmul truediv floordiv mod lshift rshift and xor or pow" ) inplace = ' '.join('i%s' % n for n in numerics.split()) right = ' '.join('r%s' % n for n in numerics.split()) @@ -1954,7 +1971,7 @@ def _patch_stopall(): _non_defaults = { '__get__', '__set__', '__delete__', '__reversed__', '__missing__', '__reduce__', '__reduce_ex__', '__getinitargs__', '__getnewargs__', - '__getstate__', '__setstate__', '__getformat__', '__setformat__', + '__getstate__', '__setstate__', '__getformat__', '__repr__', '__dir__', '__subclasses__', '__format__', '__getnewargs_ex__', } @@ -2186,6 +2203,10 @@ def __init__(self, /, *args, **kwargs): code_mock = NonCallableMock(spec_set=CodeType) code_mock.co_flags = inspect.CO_COROUTINE self.__dict__['__code__'] = code_mock + self.__dict__['__name__'] = 'AsyncMock' + self.__dict__['__defaults__'] = tuple() + self.__dict__['__kwdefaults__'] = {} + self.__dict__['__annotations__'] = None async def _execute_mock_call(self, /, *args, **kwargs): # This is nearly just like super(), except for special handling @@ -2204,7 +2225,7 @@ async def _execute_mock_call(self, /, *args, **kwargs): try: result = next(effect) except StopIteration: - # It is impossible to propogate a StopIteration + # It is impossible to propagate a StopIteration # through coroutines because of PEP 479 raise StopAsyncIteration if _is_exception(result): @@ -2803,6 +2824,7 @@ def __init__(self, spec, spec_set=False, parent=None, file_spec = None +open_spec = None def _to_stream(read_data): @@ -2859,8 +2881,12 @@ def _next_side_effect(): import _io file_spec = list(set(dir(_io.TextIOWrapper)).union(set(dir(_io.BytesIO)))) + global open_spec + if open_spec is None: + import _io + open_spec = list(set(dir(_io.open))) if mock is None: - mock = MagicMock(name='open', spec=open) + mock = MagicMock(name='open', spec=open_spec) handle = MagicMock(spec=file_spec) handle.__enter__.return_value = handle diff --git a/Lib/unittest/result.py b/Lib/unittest/result.py index 3da7005e60..5ca4c23238 100644 --- a/Lib/unittest/result.py +++ b/Lib/unittest/result.py @@ -196,6 +196,7 @@ def _clean_tracebacks(self, exctype, value, tb, test): ret = None first = True excs = [(exctype, value, tb)] + seen = {id(value)} # Detect loops in chained exceptions. while excs: (exctype, value, tb) = excs.pop() # Skip test runner traceback levels @@ -214,8 +215,9 @@ def _clean_tracebacks(self, exctype, value, tb, test): if value is not None: for c in (value.__cause__, value.__context__): - if c is not None: + if c is not None and id(c) not in seen: excs.append((type(c), c, c.__traceback__)) + seen.add(id(c)) return ret def _is_relevant_tb_level(self, tb): diff --git a/Lib/unittest/runner.py b/Lib/unittest/runner.py index 6678adb6a7..cb452c7ade 100644 --- a/Lib/unittest/runner.py +++ b/Lib/unittest/runner.py @@ -200,6 +200,15 @@ def run(self, test): if self.warnings: # if self.warnings is set, use it to filter all the warnings warnings.simplefilter(self.warnings) + # if the filter is 'default' or 'always', special-case the + # warnings from the deprecated unittest methods to show them + # no more than once per module, because they can be fairly + # noisy. The -Wd and -Wa flags can be used to bypass this + # only when self.warnings is None. + if self.warnings in ['default', 'always']: + warnings.filterwarnings('module', + category=DeprecationWarning, + message=r'Please use assert\w+ instead.') startTime = time.perf_counter() startTestRun = getattr(result, 'startTestRun', None) if startTestRun is not None: diff --git a/Lib/unittest/test/__init__.py b/Lib/unittest/test/__init__.py index cdae8a7442..143f4ab5a3 100644 --- a/Lib/unittest/test/__init__.py +++ b/Lib/unittest/test/__init__.py @@ -11,7 +11,10 @@ def suite(): for fn in os.listdir(here): if fn.startswith("test") and fn.endswith(".py"): modname = "unittest.test." + fn[:-3] - __import__(modname) + try: + __import__(modname) + except unittest.SkipTest: + continue module = sys.modules[modname] suite.addTest(loader.loadTestsFromModule(module)) suite.addTest(loader.loadTestsFromName('unittest.test.testmock')) diff --git a/Lib/unittest/test/test_async_case.py b/Lib/unittest/test/test_async_case.py index 49a9cf8683..6889c39127 100644 --- a/Lib/unittest/test/test_async_case.py +++ b/Lib/unittest/test/test_async_case.py @@ -1,7 +1,10 @@ import asyncio +import contextvars import unittest from test import support +support.requires_working_socket(module=True) + class MyException(Exception): pass @@ -11,77 +14,135 @@ def tearDownModule(): asyncio.set_event_loop_policy(None) +class TestCM: + def __init__(self, ordering, enter_result=None): + self.ordering = ordering + self.enter_result = enter_result + + async def __aenter__(self): + self.ordering.append('enter') + return self.enter_result + + async def __aexit__(self, *exc_info): + self.ordering.append('exit') + + +class LacksEnterAndExit: + pass +class LacksEnter: + async def __aexit__(self, *exc_info): + pass +class LacksExit: + async def __aenter__(self): + pass + + +VAR = contextvars.ContextVar('VAR', default=()) + + @unittest.skip("TODO: RUSTPYTHON; requires sys.get_coroutine_origin_tracking_depth()") class TestAsyncCase(unittest.TestCase): maxDiff = None - def tearDown(self): + def setUp(self): # Ensure that IsolatedAsyncioTestCase instances are destroyed before # starting a new event loop - support.gc_collect() + self.addCleanup(support.gc_collect) def test_full_cycle(self): + expected = ['setUp', + 'asyncSetUp', + 'test', + 'asyncTearDown', + 'tearDown', + 'cleanup6', + 'cleanup5', + 'cleanup4', + 'cleanup3', + 'cleanup2', + 'cleanup1'] class Test(unittest.IsolatedAsyncioTestCase): def setUp(self): self.assertEqual(events, []) events.append('setUp') + VAR.set(VAR.get() + ('setUp',)) + self.addCleanup(self.on_cleanup1) + self.addAsyncCleanup(self.on_cleanup2) async def asyncSetUp(self): - self.assertEqual(events, ['setUp']) + self.assertEqual(events, expected[:1]) events.append('asyncSetUp') - self.addAsyncCleanup(self.on_cleanup1) + VAR.set(VAR.get() + ('asyncSetUp',)) + self.addCleanup(self.on_cleanup3) + self.addAsyncCleanup(self.on_cleanup4) async def test_func(self): - self.assertEqual(events, ['setUp', - 'asyncSetUp']) + self.assertEqual(events, expected[:2]) events.append('test') - self.addAsyncCleanup(self.on_cleanup2) + VAR.set(VAR.get() + ('test',)) + self.addCleanup(self.on_cleanup5) + self.addAsyncCleanup(self.on_cleanup6) async def asyncTearDown(self): - self.assertEqual(events, ['setUp', - 'asyncSetUp', - 'test']) + self.assertEqual(events, expected[:3]) + VAR.set(VAR.get() + ('asyncTearDown',)) events.append('asyncTearDown') def tearDown(self): - self.assertEqual(events, ['setUp', - 'asyncSetUp', - 'test', - 'asyncTearDown']) + self.assertEqual(events, expected[:4]) events.append('tearDown') + VAR.set(VAR.get() + ('tearDown',)) - async def on_cleanup1(self): - self.assertEqual(events, ['setUp', - 'asyncSetUp', - 'test', - 'asyncTearDown', - 'tearDown', - 'cleanup2']) + def on_cleanup1(self): + self.assertEqual(events, expected[:10]) events.append('cleanup1') + VAR.set(VAR.get() + ('cleanup1',)) + nonlocal cvar + cvar = VAR.get() async def on_cleanup2(self): - self.assertEqual(events, ['setUp', - 'asyncSetUp', - 'test', - 'asyncTearDown', - 'tearDown']) + self.assertEqual(events, expected[:9]) events.append('cleanup2') + VAR.set(VAR.get() + ('cleanup2',)) + + def on_cleanup3(self): + self.assertEqual(events, expected[:8]) + events.append('cleanup3') + VAR.set(VAR.get() + ('cleanup3',)) + + async def on_cleanup4(self): + self.assertEqual(events, expected[:7]) + events.append('cleanup4') + VAR.set(VAR.get() + ('cleanup4',)) + + def on_cleanup5(self): + self.assertEqual(events, expected[:6]) + events.append('cleanup5') + VAR.set(VAR.get() + ('cleanup5',)) + + async def on_cleanup6(self): + self.assertEqual(events, expected[:5]) + events.append('cleanup6') + VAR.set(VAR.get() + ('cleanup6',)) events = [] + cvar = () test = Test("test_func") result = test.run() self.assertEqual(result.errors, []) self.assertEqual(result.failures, []) - expected = ['setUp', 'asyncSetUp', 'test', - 'asyncTearDown', 'tearDown', 'cleanup2', 'cleanup1'] self.assertEqual(events, expected) + self.assertEqual(cvar, tuple(expected)) events = [] + cvar = () test = Test("test_func") test.debug() self.assertEqual(events, expected) + self.assertEqual(cvar, tuple(expected)) test.doCleanups() self.assertEqual(events, expected) + self.assertEqual(cvar, tuple(expected)) def test_exception_in_setup(self): class Test(unittest.IsolatedAsyncioTestCase): @@ -109,6 +170,7 @@ async def on_cleanup(self): events = [] test = Test("test_func") + self.addCleanup(test._tearDownAsyncioRunner) try: test.debug() except MyException: @@ -144,6 +206,7 @@ async def on_cleanup(self): events = [] test = Test("test_func") + self.addCleanup(test._tearDownAsyncioRunner) try: test.debug() except MyException: @@ -179,6 +242,7 @@ async def on_cleanup(self): events = [] test = Test("test_func") + self.addCleanup(test._tearDownAsyncioRunner) try: test.debug() except MyException: @@ -220,6 +284,7 @@ async def on_cleanup2(self): events = [] test = Test("test_func") + self.addCleanup(test._tearDownAsyncioRunner) try: test.debug() except MyException: @@ -230,6 +295,37 @@ async def on_cleanup2(self): test.doCleanups() self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup2', 'cleanup1']) + def test_deprecation_of_return_val_from_test(self): + # Issue 41322 - deprecate return of value that is not None from a test + class Nothing: + def __eq__(self, o): + return o is None + class Test(unittest.IsolatedAsyncioTestCase): + async def test1(self): + return 1 + async def test2(self): + yield 1 + async def test3(self): + return Nothing() + + with self.assertWarns(DeprecationWarning) as w: + Test('test1').run() + self.assertIn('It is deprecated to return a value that is not None', str(w.warning)) + self.assertIn('test1', str(w.warning)) + self.assertEqual(w.filename, __file__) + + with self.assertWarns(DeprecationWarning) as w: + Test('test2').run() + self.assertIn('It is deprecated to return a value that is not None', str(w.warning)) + self.assertIn('test2', str(w.warning)) + self.assertEqual(w.filename, __file__) + + with self.assertWarns(DeprecationWarning) as w: + Test('test3').run() + self.assertIn('It is deprecated to return a value that is not None', str(w.warning)) + self.assertIn('test3', str(w.warning)) + self.assertEqual(w.filename, __file__) + def test_cleanups_interleave_order(self): events = [] @@ -298,6 +394,36 @@ async def coro(): output = test.run() self.assertTrue(cancelled) + def test_enterAsyncContext(self): + events = [] + + class Test(unittest.IsolatedAsyncioTestCase): + async def test_func(slf): + slf.addAsyncCleanup(events.append, 'cleanup1') + cm = TestCM(events, 42) + self.assertEqual(await slf.enterAsyncContext(cm), 42) + slf.addAsyncCleanup(events.append, 'cleanup2') + events.append('test') + + test = Test('test_func') + output = test.run() + self.assertTrue(output.wasSuccessful(), output) + self.assertEqual(events, ['enter', 'test', 'cleanup2', 'exit', 'cleanup1']) + + def test_enterAsyncContext_arg_errors(self): + class Test(unittest.IsolatedAsyncioTestCase): + async def test_func(slf): + with self.assertRaisesRegex(TypeError, 'asynchronous context manager'): + await slf.enterAsyncContext(LacksEnterAndExit()) + with self.assertRaisesRegex(TypeError, 'asynchronous context manager'): + await slf.enterAsyncContext(LacksEnter()) + with self.assertRaisesRegex(TypeError, 'asynchronous context manager'): + await slf.enterAsyncContext(LacksExit()) + + test = Test('test_func') + output = test.run() + self.assertTrue(output.wasSuccessful()) + def test_debug_cleanup_same_loop(self): class Test(unittest.IsolatedAsyncioTestCase): async def asyncSetUp(self): @@ -332,6 +458,7 @@ async def cleanup(self, fut): events = [] test = Test("test_func") + self.addCleanup(test._tearDownAsyncioRunner) try: test.debug() except MyException: @@ -342,6 +469,21 @@ async def cleanup(self, fut): test.doCleanups() self.assertEqual(events, ['asyncSetUp', 'test', 'cleanup']) + def test_setup_get_event_loop(self): + # See https://github.com/python/cpython/issues/95736 + # Make sure the default event loop is not used + asyncio.set_event_loop(None) + + class TestCase1(unittest.IsolatedAsyncioTestCase): + def setUp(self): + asyncio.get_event_loop_policy().get_event_loop() + + async def test_demo1(self): + pass + + test = TestCase1('test_demo1') + result = test.run() + self.assertTrue(result.wasSuccessful()) if __name__ == "__main__": unittest.main() diff --git a/Lib/unittest/test/test_break.py b/Lib/unittest/test/test_break.py index 9f68a393f4..33cbdd2661 100644 --- a/Lib/unittest/test/test_break.py +++ b/Lib/unittest/test/test_break.py @@ -4,14 +4,18 @@ import sys import signal import weakref - import unittest +from test import support + @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill") @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows") class TestBreak(unittest.TestCase): int_handler = None + # This number was smart-guessed, previously tests were failing + # after 7th run. So, we take `x * 2 + 1` to be sure. + default_repeats = 15 def setUp(self): self._default_handler = signal.getsignal(signal.SIGINT) @@ -24,6 +28,27 @@ def tearDown(self): unittest.signals._interrupt_handler = None + def withRepeats(self, test_function, repeats=None): + if not support.check_impl_detail(cpython=True): + # Override repeats count on non-cpython to execute only once. + # Because this test only makes sense to be repeated on CPython. + repeats = 1 + elif repeats is None: + repeats = self.default_repeats + + for repeat in range(repeats): + with self.subTest(repeat=repeat): + # We don't run `setUp` for the very first repeat + # and we don't run `tearDown` for the very last one, + # because they are handled by the test class itself. + if repeat != 0: + self.setUp() + try: + test_function() + finally: + if repeat != repeats - 1: + self.tearDown() + def testInstallHandler(self): default_handler = signal.getsignal(signal.SIGINT) unittest.installHandler() @@ -48,35 +73,34 @@ def testRegisterResult(self): unittest.removeResult(result) def testInterruptCaught(self): - default_handler = signal.getsignal(signal.SIGINT) - - result = unittest.TestResult() - unittest.installHandler() - unittest.registerResult(result) - - self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) - def test(result): pid = os.getpid() os.kill(pid, signal.SIGINT) result.breakCaught = True self.assertTrue(result.shouldStop) - try: - test(result) - except KeyboardInterrupt: - self.fail("KeyboardInterrupt not handled") - self.assertTrue(result.breakCaught) + def test_function(): + result = unittest.TestResult() + unittest.installHandler() + unittest.registerResult(result) + self.assertNotEqual( + signal.getsignal(signal.SIGINT), + self._default_handler, + ) + + try: + test(result) + except KeyboardInterrupt: + self.fail("KeyboardInterrupt not handled") + self.assertTrue(result.breakCaught) + self.withRepeats(test_function) def testSecondInterrupt(self): # Can't use skipIf decorator because the signal handler may have # been changed after defining this method. if signal.getsignal(signal.SIGINT) == signal.SIG_IGN: self.skipTest("test requires SIGINT to not be ignored") - result = unittest.TestResult() - unittest.installHandler() - unittest.registerResult(result) def test(result): pid = os.getpid() @@ -86,40 +110,40 @@ def test(result): os.kill(pid, signal.SIGINT) self.fail("Second KeyboardInterrupt not raised") - try: - test(result) - except KeyboardInterrupt: - pass - else: - self.fail("Second KeyboardInterrupt not raised") - self.assertTrue(result.breakCaught) + def test_function(): + result = unittest.TestResult() + unittest.installHandler() + unittest.registerResult(result) + with self.assertRaises(KeyboardInterrupt): + test(result) + self.assertTrue(result.breakCaught) + self.withRepeats(test_function) - def testTwoResults(self): - unittest.installHandler() - result = unittest.TestResult() - unittest.registerResult(result) - new_handler = signal.getsignal(signal.SIGINT) + def testTwoResults(self): + def test_function(): + unittest.installHandler() - result2 = unittest.TestResult() - unittest.registerResult(result2) - self.assertEqual(signal.getsignal(signal.SIGINT), new_handler) + result = unittest.TestResult() + unittest.registerResult(result) + new_handler = signal.getsignal(signal.SIGINT) - result3 = unittest.TestResult() + result2 = unittest.TestResult() + unittest.registerResult(result2) + self.assertEqual(signal.getsignal(signal.SIGINT), new_handler) - def test(result): - pid = os.getpid() - os.kill(pid, signal.SIGINT) + result3 = unittest.TestResult() - try: - test(result) - except KeyboardInterrupt: - self.fail("KeyboardInterrupt not handled") + try: + os.kill(os.getpid(), signal.SIGINT) + except KeyboardInterrupt: + self.fail("KeyboardInterrupt not handled") - self.assertTrue(result.shouldStop) - self.assertTrue(result2.shouldStop) - self.assertFalse(result3.shouldStop) + self.assertTrue(result.shouldStop) + self.assertTrue(result2.shouldStop) + self.assertFalse(result3.shouldStop) + self.withRepeats(test_function) def testHandlerReplacedButCalled(self): @@ -127,23 +151,25 @@ def testHandlerReplacedButCalled(self): # been changed after defining this method. if signal.getsignal(signal.SIGINT) == signal.SIG_IGN: self.skipTest("test requires SIGINT to not be ignored") - # If our handler has been replaced (is no longer installed) but is - # called by the *new* handler, then it isn't safe to delay the - # SIGINT and we should immediately delegate to the default handler - unittest.installHandler() - - handler = signal.getsignal(signal.SIGINT) - def new_handler(frame, signum): - handler(frame, signum) - signal.signal(signal.SIGINT, new_handler) - try: - pid = os.getpid() - os.kill(pid, signal.SIGINT) - except KeyboardInterrupt: - pass - else: - self.fail("replaced but delegated handler doesn't raise interrupt") + def test_function(): + # If our handler has been replaced (is no longer installed) but is + # called by the *new* handler, then it isn't safe to delay the + # SIGINT and we should immediately delegate to the default handler + unittest.installHandler() + + handler = signal.getsignal(signal.SIGINT) + def new_handler(frame, signum): + handler(frame, signum) + signal.signal(signal.SIGINT, new_handler) + + try: + os.kill(os.getpid(), signal.SIGINT) + except KeyboardInterrupt: + pass + else: + self.fail("replaced but delegated handler doesn't raise interrupt") + self.withRepeats(test_function) def testRunner(self): # Creating a TextTestRunner with the appropriate argument should @@ -162,8 +188,7 @@ def testWeakReferences(self): del result # For non-reference counting implementations - # XXX RUSTPYTHON TODO: gc module - # gc.collect();gc.collect() + gc.collect();gc.collect() self.assertIsNone(ref()) diff --git a/Lib/unittest/test/test_case.py b/Lib/unittest/test/test_case.py index 796594ffd2..e2547f4777 100644 --- a/Lib/unittest/test/test_case.py +++ b/Lib/unittest/test/test_case.py @@ -197,8 +197,8 @@ def test(self): super(Foo, self).test() raise RuntimeError('raised by Foo.test') - expected = ['startTest', 'setUp', 'test', 'tearDown', - 'addError', 'stopTest'] + expected = ['startTest', 'setUp', 'test', + 'addError', 'tearDown', 'stopTest'] Foo(events).run(result) self.assertEqual(events, expected) @@ -216,7 +216,7 @@ def test(self): raise RuntimeError('raised by Foo.test') expected = ['startTestRun', 'startTest', 'setUp', 'test', - 'tearDown', 'addError', 'stopTest', 'stopTestRun'] + 'addError', 'tearDown', 'stopTest', 'stopTestRun'] Foo(events).run() self.assertEqual(events, expected) @@ -236,8 +236,8 @@ def test(self): super(Foo, self).test() self.fail('raised by Foo.test') - expected = ['startTest', 'setUp', 'test', 'tearDown', - 'addFailure', 'stopTest'] + expected = ['startTest', 'setUp', 'test', + 'addFailure', 'tearDown', 'stopTest'] Foo(events).run(result) self.assertEqual(events, expected) @@ -252,7 +252,7 @@ def test(self): self.fail('raised by Foo.test') expected = ['startTestRun', 'startTest', 'setUp', 'test', - 'tearDown', 'addFailure', 'stopTest', 'stopTestRun'] + 'addFailure', 'tearDown', 'stopTest', 'stopTestRun'] events = [] Foo(events).run() self.assertEqual(events, expected) @@ -306,6 +306,37 @@ def test(self): Foo('test').run() + def test_deprecation_of_return_val_from_test(self): + # Issue 41322 - deprecate return of value that is not None from a test + class Nothing: + def __eq__(self, o): + return o is None + class Foo(unittest.TestCase): + def test1(self): + return 1 + def test2(self): + yield 1 + def test3(self): + return Nothing() + + with self.assertWarns(DeprecationWarning) as w: + Foo('test1').run() + self.assertIn('It is deprecated to return a value that is not None', str(w.warning)) + self.assertIn('test1', str(w.warning)) + self.assertEqual(w.filename, __file__) + + with self.assertWarns(DeprecationWarning) as w: + Foo('test2').run() + self.assertIn('It is deprecated to return a value that is not None', str(w.warning)) + self.assertIn('test2', str(w.warning)) + self.assertEqual(w.filename, __file__) + + with self.assertWarns(DeprecationWarning) as w: + Foo('test3').run() + self.assertIn('It is deprecated to return a value that is not None', str(w.warning)) + self.assertIn('test3', str(w.warning)) + self.assertEqual(w.filename, __file__) + def _check_call_order__subtests(self, result, events, expected_events): class Foo(Test.LoggingTestCase): def test(self): @@ -333,10 +364,10 @@ def test(self): def test_run_call_order__subtests(self): events = [] result = LoggingResult(events) - expected = ['startTest', 'setUp', 'test', 'tearDown', + expected = ['startTest', 'setUp', 'test', 'addSubTestFailure', 'addSubTestSuccess', 'addSubTestFailure', 'addSubTestFailure', - 'addSubTestSuccess', 'addError', 'stopTest'] + 'addSubTestSuccess', 'addError', 'tearDown', 'stopTest'] self._check_call_order__subtests(result, events, expected) def test_run_call_order__subtests_legacy(self): @@ -344,8 +375,8 @@ def test_run_call_order__subtests_legacy(self): # text execution stops after the first subtest failure. events = [] result = LegacyLoggingResult(events) - expected = ['startTest', 'setUp', 'test', 'tearDown', - 'addFailure', 'stopTest'] + expected = ['startTest', 'setUp', 'test', + 'addFailure', 'tearDown', 'stopTest'] self._check_call_order__subtests(result, events, expected) def _check_call_order__subtests_success(self, result, events, expected_events): @@ -366,9 +397,9 @@ def test_run_call_order__subtests_success(self): result = LoggingResult(events) # The 6 subtest successes are individually recorded, in addition # to the whole test success. - expected = (['startTest', 'setUp', 'test', 'tearDown'] + expected = (['startTest', 'setUp', 'test'] + 6 * ['addSubTestSuccess'] - + ['addSuccess', 'stopTest']) + + ['tearDown', 'addSuccess', 'stopTest']) self._check_call_order__subtests_success(result, events, expected) def test_run_call_order__subtests_success_legacy(self): @@ -393,8 +424,8 @@ def test(self): self.fail('failure') self.fail('failure') - expected = ['startTest', 'setUp', 'test', 'tearDown', - 'addSubTestFailure', 'stopTest'] + expected = ['startTest', 'setUp', 'test', + 'addSubTestFailure', 'tearDown', 'stopTest'] Foo(events).run(result) self.assertEqual(events, expected) @@ -420,7 +451,7 @@ def test_c(self): result = unittest.TestResult() result.failfast = True - suite = unittest.makeSuite(Foo) + suite = unittest.TestLoader().loadTestsFromTestCase(Foo) suite.run(result) expected = ['a1', 'a2', 'b1'] @@ -708,10 +739,6 @@ def testAssertDictContainsSubset(self): with self.assertRaises(self.failureException): self.assertDictContainsSubset({'foo': one}, {'foo': '\uFFFD'}) - with self.assertWarns(DeprecationWarning) as warninfo: - self.assertDictContainsSubset({}, {}) - self.assertEqual(warninfo.warnings[0].filename, __file__) - def testAssertEqual(self): equal_pairs = [ ((), ()), diff --git a/Lib/unittest/test/test_discovery.py b/Lib/unittest/test/test_discovery.py index 9d502c51fb..3b58786ec1 100644 --- a/Lib/unittest/test/test_discovery.py +++ b/Lib/unittest/test/test_discovery.py @@ -396,7 +396,7 @@ def restore_isdir(): self.addCleanup(restore_isdir) _find_tests_args = [] - def _find_tests(start_dir, pattern, namespace=None): + def _find_tests(start_dir, pattern): _find_tests_args.append((start_dir, pattern)) return ['tests'] loader._find_tests = _find_tests @@ -792,7 +792,7 @@ def test_discovery_from_dotted_path(self): expectedPath = os.path.abspath(os.path.dirname(unittest.test.__file__)) self.wasRun = False - def _find_tests(start_dir, pattern, namespace=None): + def _find_tests(start_dir, pattern): self.wasRun = True self.assertEqual(start_dir, expectedPath) return tests @@ -825,37 +825,6 @@ def restore(): 'Can not use builtin modules ' 'as dotted module names') - def test_discovery_from_dotted_namespace_packages(self): - loader = unittest.TestLoader() - - package = types.ModuleType('package') - package.__path__ = ['/a', '/b'] - package.__spec__ = types.SimpleNamespace( - loader=None, - submodule_search_locations=['/a', '/b'] - ) - - def _import(packagename, *args, **kwargs): - sys.modules[packagename] = package - return package - - _find_tests_args = [] - def _find_tests(start_dir, pattern, namespace=None): - _find_tests_args.append((start_dir, pattern)) - return ['%s/tests' % start_dir] - - loader._find_tests = _find_tests - loader.suiteClass = list - - with unittest.mock.patch('builtins.__import__', _import): - # Since loader.discover() can modify sys.path, restore it when done. - with import_helper.DirsOnSysPath(): - # Make sure to remove 'package' from sys.modules when done. - with test.test_importlib.util.uncache('package'): - suite = loader.discover('package') - - self.assertEqual(suite, ['/a/tests', '/b/tests']) - def test_discovery_failed_discovery(self): loader = unittest.TestLoader() package = types.ModuleType('package') diff --git a/Lib/unittest/test/test_functiontestcase.py b/Lib/unittest/test/test_functiontestcase.py index c5f2bcbe74..4971729880 100644 --- a/Lib/unittest/test/test_functiontestcase.py +++ b/Lib/unittest/test/test_functiontestcase.py @@ -58,8 +58,8 @@ def test(): def tearDown(): events.append('tearDown') - expected = ['startTest', 'setUp', 'test', 'tearDown', - 'addError', 'stopTest'] + expected = ['startTest', 'setUp', 'test', + 'addError', 'tearDown', 'stopTest'] unittest.FunctionTestCase(test, setUp, tearDown).run(result) self.assertEqual(events, expected) @@ -84,8 +84,8 @@ def test(): def tearDown(): events.append('tearDown') - expected = ['startTest', 'setUp', 'test', 'tearDown', - 'addFailure', 'stopTest'] + expected = ['startTest', 'setUp', 'test', + 'addFailure', 'tearDown', 'stopTest'] unittest.FunctionTestCase(test, setUp, tearDown).run(result) self.assertEqual(events, expected) diff --git a/Lib/unittest/test/test_loader.py b/Lib/unittest/test/test_loader.py index bc54bf0553..de2268cda9 100644 --- a/Lib/unittest/test/test_loader.py +++ b/Lib/unittest/test/test_loader.py @@ -1591,5 +1591,52 @@ class Foo(unittest.TestCase): self.assertEqual(loader.getTestCaseNames(Foo), test_names) +class TestObsoleteFunctions(unittest.TestCase): + class MyTestSuite(unittest.TestSuite): + pass + + class MyTestCase(unittest.TestCase): + def check_1(self): pass + def check_2(self): pass + def test(self): pass + + @staticmethod + def reverse_three_way_cmp(a, b): + return unittest.util.three_way_cmp(b, a) + + def test_getTestCaseNames(self): + with self.assertWarns(DeprecationWarning) as w: + tests = unittest.getTestCaseNames(self.MyTestCase, + prefix='check', sortUsing=self.reverse_three_way_cmp, + testNamePatterns=None) + self.assertEqual(w.filename, __file__) + self.assertEqual(tests, ['check_2', 'check_1']) + + def test_makeSuite(self): + with self.assertWarns(DeprecationWarning) as w: + suite = unittest.makeSuite(self.MyTestCase, + prefix='check', sortUsing=self.reverse_three_way_cmp, + suiteClass=self.MyTestSuite) + self.assertEqual(w.filename, __file__) + self.assertIsInstance(suite, self.MyTestSuite) + expected = self.MyTestSuite([self.MyTestCase('check_2'), + self.MyTestCase('check_1')]) + self.assertEqual(suite, expected) + + def test_findTestCases(self): + m = types.ModuleType('m') + m.testcase_1 = self.MyTestCase + + with self.assertWarns(DeprecationWarning) as w: + suite = unittest.findTestCases(m, + prefix='check', sortUsing=self.reverse_three_way_cmp, + suiteClass=self.MyTestSuite) + self.assertEqual(w.filename, __file__) + self.assertIsInstance(suite, self.MyTestSuite) + expected = [self.MyTestSuite([self.MyTestCase('check_2'), + self.MyTestCase('check_1')])] + self.assertEqual(list(suite), expected) + + if __name__ == "__main__": unittest.main() diff --git a/Lib/unittest/test/test_program.py b/Lib/unittest/test/test_program.py index b7fbbc1e7b..26a8550af8 100644 --- a/Lib/unittest/test/test_program.py +++ b/Lib/unittest/test/test_program.py @@ -61,6 +61,17 @@ def testPass(self): pass def testFail(self): raise AssertionError + def testError(self): + 1/0 + @unittest.skip('skipping') + def testSkipped(self): + raise AssertionError + @unittest.expectedFailure + def testExpectedFailure(self): + raise AssertionError + @unittest.expectedFailure + def testUnexpectedSuccess(self): + pass class FooBarLoader(unittest.TestLoader): """Test loader that returns a suite containing FooBar.""" @@ -111,9 +122,13 @@ def test_NonExit(self): testRunner=unittest.TextTestRunner(stream=stream), testLoader=self.FooBarLoader()) self.assertTrue(hasattr(program, 'result')) - self.assertIn('\nFAIL: testFail ', stream.getvalue()) - self.assertTrue(stream.getvalue().endswith('\n\nFAILED (failures=1)\n')) - + out = stream.getvalue() + self.assertIn('\nFAIL: testFail ', out) + self.assertIn('\nERROR: testError ', out) + self.assertIn('\nUNEXPECTED SUCCESS: testUnexpectedSuccess ', out) + expected = ('\n\nFAILED (failures=1, errors=1, skipped=1, ' + 'expected failures=1, unexpected successes=1)\n') + self.assertTrue(out.endswith(expected)) def test_Exit(self): stream = BufferedWriter() @@ -124,9 +139,13 @@ def test_Exit(self): testRunner=unittest.TextTestRunner(stream=stream), exit=True, testLoader=self.FooBarLoader()) - self.assertIn('\nFAIL: testFail ', stream.getvalue()) - self.assertTrue(stream.getvalue().endswith('\n\nFAILED (failures=1)\n')) - + out = stream.getvalue() + self.assertIn('\nFAIL: testFail ', out) + self.assertIn('\nERROR: testError ', out) + self.assertIn('\nUNEXPECTED SUCCESS: testUnexpectedSuccess ', out) + expected = ('\n\nFAILED (failures=1, errors=1, skipped=1, ' + 'expected failures=1, unexpected successes=1)\n') + self.assertTrue(out.endswith(expected)) def test_ExitAsDefault(self): stream = BufferedWriter() @@ -136,8 +155,13 @@ def test_ExitAsDefault(self): argv=["foobar"], testRunner=unittest.TextTestRunner(stream=stream), testLoader=self.FooBarLoader()) - self.assertIn('\nFAIL: testFail ', stream.getvalue()) - self.assertTrue(stream.getvalue().endswith('\n\nFAILED (failures=1)\n')) + out = stream.getvalue() + self.assertIn('\nFAIL: testFail ', out) + self.assertIn('\nERROR: testError ', out) + self.assertIn('\nUNEXPECTED SUCCESS: testUnexpectedSuccess ', out) + expected = ('\n\nFAILED (failures=1, errors=1, skipped=1, ' + 'expected failures=1, unexpected successes=1)\n') + self.assertTrue(out.endswith(expected)) class InitialisableProgram(unittest.TestProgram): @@ -172,6 +196,7 @@ def run(self, test): return RESULT +@support.requires_subprocess() class TestCommandLineArgs(unittest.TestCase): def setUp(self): @@ -429,7 +454,9 @@ def testParseArgsSelectedTestNames(self): def testSelectedTestNamesFunctionalTest(self): def run_unittest(args): - p = subprocess.Popen([sys.executable, '-m', 'unittest'] + args, + # Use -E to ignore PYTHONSAFEPATH env var + cmd = [sys.executable, '-E', '-m', 'unittest'] + args + p = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, cwd=os.path.dirname(__file__)) with p: _, stderr = p.communicate() diff --git a/Lib/unittest/test/test_result.py b/Lib/unittest/test/test_result.py index c5aaba0ff5..9320b0a44b 100644 --- a/Lib/unittest/test/test_result.py +++ b/Lib/unittest/test/test_result.py @@ -275,6 +275,62 @@ def get_exc_info(): self.assertEqual(len(dropped), 1) self.assertIn("raise self.failureException(msg)", dropped[0]) + def test_addFailure_filter_traceback_frames_chained_exception_self_loop(self): + class Foo(unittest.TestCase): + def test_1(self): + pass + + def get_exc_info(): + try: + loop = Exception("Loop") + loop.__cause__ = loop + loop.__context__ = loop + raise loop + except: + return sys.exc_info() + + exc_info_tuple = get_exc_info() + + test = Foo('test_1') + result = unittest.TestResult() + result.startTest(test) + result.addFailure(test, exc_info_tuple) + result.stopTest(test) + + formatted_exc = result.failures[0][1] + self.assertEqual(formatted_exc.count("Exception: Loop\n"), 1) + + def test_addFailure_filter_traceback_frames_chained_exception_cycle(self): + class Foo(unittest.TestCase): + def test_1(self): + pass + + def get_exc_info(): + try: + # Create two directionally opposed cycles + # __cause__ in one direction, __context__ in the other + A, B, C = Exception("A"), Exception("B"), Exception("C") + edges = [(C, B), (B, A), (A, C)] + for ex1, ex2 in edges: + ex1.__cause__ = ex2 + ex2.__context__ = ex1 + raise C + except: + return sys.exc_info() + + exc_info_tuple = get_exc_info() + + test = Foo('test_1') + result = unittest.TestResult() + result.startTest(test) + result.addFailure(test, exc_info_tuple) + result.stopTest(test) + + formatted_exc = result.failures[0][1] + self.assertEqual(formatted_exc.count("Exception: A\n"), 1) + self.assertEqual(formatted_exc.count("Exception: B\n"), 1) + self.assertEqual(formatted_exc.count("Exception: C\n"), 1) + # "addError(test, err)" # ... # "Called when the test case test raises an unexpected exception err @@ -376,12 +432,54 @@ def test_1(self): self.assertIs(test_case, subtest) self.assertIn("some recognizable failure", formatted_exc) + def testStackFrameTrimming(self): + class Frame(object): + class tb_frame(object): + f_globals = {} + result = unittest.TestResult() + self.assertFalse(result._is_relevant_tb_level(Frame)) + + Frame.tb_frame.f_globals['__unittest'] = True + self.assertTrue(result._is_relevant_tb_level(Frame)) + + def testFailFast(self): + result = unittest.TestResult() + result._exc_info_to_string = lambda *_: '' + result.failfast = True + result.addError(None, None) + self.assertTrue(result.shouldStop) + + result = unittest.TestResult() + result._exc_info_to_string = lambda *_: '' + result.failfast = True + result.addFailure(None, None) + self.assertTrue(result.shouldStop) + + result = unittest.TestResult() + result._exc_info_to_string = lambda *_: '' + result.failfast = True + result.addUnexpectedSuccess(None) + self.assertTrue(result.shouldStop) + + def testFailFastSetByRunner(self): + stream = BufferedWriter() + runner = unittest.TextTestRunner(stream=stream, failfast=True) + def test(result): + self.assertTrue(result.failfast) + result = runner.run(test) + stream.flush() + self.assertTrue(stream.getvalue().endswith('\n\nOK\n')) + + +class Test_TextTestResult(unittest.TestCase): + maxDiff = None + def testGetDescriptionWithoutDocstring(self): result = unittest.TextTestResult(None, True, 1) self.assertEqual( result.getDescription(self), 'testGetDescriptionWithoutDocstring (' + __name__ + - '.Test_TestResult)') + '.Test_TextTestResult.testGetDescriptionWithoutDocstring)') def testGetSubTestDescriptionWithoutDocstring(self): with self.subTest(foo=1, bar=2): @@ -389,13 +487,14 @@ def testGetSubTestDescriptionWithoutDocstring(self): self.assertEqual( result.getDescription(self._subtest), 'testGetSubTestDescriptionWithoutDocstring (' + __name__ + - '.Test_TestResult) (foo=1, bar=2)') + '.Test_TextTestResult.testGetSubTestDescriptionWithoutDocstring) (foo=1, bar=2)') + with self.subTest('some message'): result = unittest.TextTestResult(None, True, 1) self.assertEqual( result.getDescription(self._subtest), 'testGetSubTestDescriptionWithoutDocstring (' + __name__ + - '.Test_TestResult) [some message]') + '.Test_TextTestResult.testGetSubTestDescriptionWithoutDocstring) [some message]') def testGetSubTestDescriptionWithoutDocstringAndParams(self): with self.subTest(): @@ -403,10 +502,11 @@ def testGetSubTestDescriptionWithoutDocstringAndParams(self): self.assertEqual( result.getDescription(self._subtest), 'testGetSubTestDescriptionWithoutDocstringAndParams ' - '(' + __name__ + '.Test_TestResult) ()') + '(' + __name__ + '.Test_TextTestResult.testGetSubTestDescriptionWithoutDocstringAndParams) ' + '()') def testGetSubTestDescriptionForFalsyValues(self): - expected = 'testGetSubTestDescriptionForFalsyValues (%s.Test_TestResult) [%s]' + expected = 'testGetSubTestDescriptionForFalsyValues (%s.Test_TextTestResult.testGetSubTestDescriptionForFalsyValues) [%s]' result = unittest.TextTestResult(None, True, 1) for arg in [0, None, []]: with self.subTest(arg): @@ -422,7 +522,8 @@ def testGetNestedSubTestDescriptionWithoutDocstring(self): self.assertEqual( result.getDescription(self._subtest), 'testGetNestedSubTestDescriptionWithoutDocstring ' - '(' + __name__ + '.Test_TestResult) (baz=2, bar=3, foo=1)') + '(' + __name__ + '.Test_TextTestResult.testGetNestedSubTestDescriptionWithoutDocstring) ' + '(baz=2, bar=3, foo=1)') def testGetDuplicatedNestedSubTestDescriptionWithoutDocstring(self): with self.subTest(foo=1, bar=2): @@ -431,7 +532,7 @@ def testGetDuplicatedNestedSubTestDescriptionWithoutDocstring(self): self.assertEqual( result.getDescription(self._subtest), 'testGetDuplicatedNestedSubTestDescriptionWithoutDocstring ' - '(' + __name__ + '.Test_TestResult) (baz=3, bar=4, foo=1)') + '(' + __name__ + '.Test_TextTestResult.testGetDuplicatedNestedSubTestDescriptionWithoutDocstring) (baz=3, bar=4, foo=1)') @unittest.skipIf(sys.flags.optimize >= 2, "Docstrings are omitted with -O2 and above") @@ -441,7 +542,7 @@ def testGetDescriptionWithOneLineDocstring(self): self.assertEqual( result.getDescription(self), ('testGetDescriptionWithOneLineDocstring ' - '(' + __name__ + '.Test_TestResult)\n' + '(' + __name__ + '.Test_TextTestResult.testGetDescriptionWithOneLineDocstring)\n' 'Tests getDescription() for a method with a docstring.')) @unittest.skipIf(sys.flags.optimize >= 2, @@ -453,7 +554,9 @@ def testGetSubTestDescriptionWithOneLineDocstring(self): self.assertEqual( result.getDescription(self._subtest), ('testGetSubTestDescriptionWithOneLineDocstring ' - '(' + __name__ + '.Test_TestResult) (foo=1, bar=2)\n' + '(' + __name__ + '.Test_TextTestResult.testGetSubTestDescriptionWithOneLineDocstring) ' + '(foo=1, bar=2)\n' + 'Tests getDescription() for a method with a docstring.')) @unittest.skipIf(sys.flags.optimize >= 2, @@ -466,7 +569,7 @@ def testGetDescriptionWithMultiLineDocstring(self): self.assertEqual( result.getDescription(self), ('testGetDescriptionWithMultiLineDocstring ' - '(' + __name__ + '.Test_TestResult)\n' + '(' + __name__ + '.Test_TextTestResult.testGetDescriptionWithMultiLineDocstring)\n' 'Tests getDescription() for a method with a longer ' 'docstring.')) @@ -481,47 +584,124 @@ def testGetSubTestDescriptionWithMultiLineDocstring(self): self.assertEqual( result.getDescription(self._subtest), ('testGetSubTestDescriptionWithMultiLineDocstring ' - '(' + __name__ + '.Test_TestResult) (foo=1, bar=2)\n' + '(' + __name__ + '.Test_TextTestResult.testGetSubTestDescriptionWithMultiLineDocstring) ' + '(foo=1, bar=2)\n' 'Tests getDescription() for a method with a longer ' 'docstring.')) - def testStackFrameTrimming(self): - class Frame(object): - class tb_frame(object): - f_globals = {} - result = unittest.TestResult() - self.assertFalse(result._is_relevant_tb_level(Frame)) - - Frame.tb_frame.f_globals['__unittest'] = True - self.assertTrue(result._is_relevant_tb_level(Frame)) - - def testFailFast(self): - result = unittest.TestResult() - result._exc_info_to_string = lambda *_: '' - result.failfast = True - result.addError(None, None) - self.assertTrue(result.shouldStop) - - result = unittest.TestResult() - result._exc_info_to_string = lambda *_: '' - result.failfast = True - result.addFailure(None, None) - self.assertTrue(result.shouldStop) - - result = unittest.TestResult() - result._exc_info_to_string = lambda *_: '' - result.failfast = True - result.addUnexpectedSuccess(None) - self.assertTrue(result.shouldStop) - - def testFailFastSetByRunner(self): + class Test(unittest.TestCase): + def testSuccess(self): + pass + def testSkip(self): + self.skipTest('skip') + def testFail(self): + self.fail('fail') + def testError(self): + raise Exception('error') + @unittest.expectedFailure + def testExpectedFailure(self): + self.fail('fail') + @unittest.expectedFailure + def testUnexpectedSuccess(self): + pass + def testSubTestSuccess(self): + with self.subTest('one', a=1): + pass + with self.subTest('two', b=2): + pass + def testSubTestMixed(self): + with self.subTest('success', a=1): + pass + with self.subTest('skip', b=2): + self.skipTest('skip') + with self.subTest('fail', c=3): + self.fail('fail') + with self.subTest('error', d=4): + raise Exception('error') + + tearDownError = None + def tearDown(self): + if self.tearDownError is not None: + raise self.tearDownError + + def _run_test(self, test_name, verbosity, tearDownError=None): stream = BufferedWriter() - runner = unittest.TextTestRunner(stream=stream, failfast=True) - def test(result): - self.assertTrue(result.failfast) - result = runner.run(test) - stream.flush() - self.assertTrue(stream.getvalue().endswith('\n\nOK\n')) + stream = unittest.runner._WritelnDecorator(stream) + result = unittest.TextTestResult(stream, True, verbosity) + test = self.Test(test_name) + test.tearDownError = tearDownError + test.run(result) + return stream.getvalue() + + def testDotsOutput(self): + self.assertEqual(self._run_test('testSuccess', 1), '.') + self.assertEqual(self._run_test('testSkip', 1), 's') + self.assertEqual(self._run_test('testFail', 1), 'F') + self.assertEqual(self._run_test('testError', 1), 'E') + self.assertEqual(self._run_test('testExpectedFailure', 1), 'x') + self.assertEqual(self._run_test('testUnexpectedSuccess', 1), 'u') + + def testLongOutput(self): + classname = f'{__name__}.{self.Test.__qualname__}' + self.assertEqual(self._run_test('testSuccess', 2), + f'testSuccess ({classname}.testSuccess) ... ok\n') + self.assertEqual(self._run_test('testSkip', 2), + f"testSkip ({classname}.testSkip) ... skipped 'skip'\n") + self.assertEqual(self._run_test('testFail', 2), + f'testFail ({classname}.testFail) ... FAIL\n') + self.assertEqual(self._run_test('testError', 2), + f'testError ({classname}.testError) ... ERROR\n') + self.assertEqual(self._run_test('testExpectedFailure', 2), + f'testExpectedFailure ({classname}.testExpectedFailure) ... expected failure\n') + self.assertEqual(self._run_test('testUnexpectedSuccess', 2), + f'testUnexpectedSuccess ({classname}.testUnexpectedSuccess) ... unexpected success\n') + + def testDotsOutputSubTestSuccess(self): + self.assertEqual(self._run_test('testSubTestSuccess', 1), '.') + + def testLongOutputSubTestSuccess(self): + classname = f'{__name__}.{self.Test.__qualname__}' + self.assertEqual(self._run_test('testSubTestSuccess', 2), + f'testSubTestSuccess ({classname}.testSubTestSuccess) ... ok\n') + + def testDotsOutputSubTestMixed(self): + self.assertEqual(self._run_test('testSubTestMixed', 1), 'sFE') + + def testLongOutputSubTestMixed(self): + classname = f'{__name__}.{self.Test.__qualname__}' + self.assertEqual(self._run_test('testSubTestMixed', 2), + f'testSubTestMixed ({classname}.testSubTestMixed) ... \n' + f" testSubTestMixed ({classname}.testSubTestMixed) [skip] (b=2) ... skipped 'skip'\n" + f' testSubTestMixed ({classname}.testSubTestMixed) [fail] (c=3) ... FAIL\n' + f' testSubTestMixed ({classname}.testSubTestMixed) [error] (d=4) ... ERROR\n') + + def testDotsOutputTearDownFail(self): + out = self._run_test('testSuccess', 1, AssertionError('fail')) + self.assertEqual(out, 'F') + out = self._run_test('testError', 1, AssertionError('fail')) + self.assertEqual(out, 'EF') + out = self._run_test('testFail', 1, Exception('error')) + self.assertEqual(out, 'FE') + out = self._run_test('testSkip', 1, AssertionError('fail')) + self.assertEqual(out, 'sF') + + def testLongOutputTearDownFail(self): + classname = f'{__name__}.{self.Test.__qualname__}' + out = self._run_test('testSuccess', 2, AssertionError('fail')) + self.assertEqual(out, + f'testSuccess ({classname}.testSuccess) ... FAIL\n') + out = self._run_test('testError', 2, AssertionError('fail')) + self.assertEqual(out, + f'testError ({classname}.testError) ... ERROR\n' + f'testError ({classname}.testError) ... FAIL\n') + out = self._run_test('testFail', 2, Exception('error')) + self.assertEqual(out, + f'testFail ({classname}.testFail) ... FAIL\n' + f'testFail ({classname}.testFail) ... ERROR\n') + out = self._run_test('testSkip', 2, AssertionError('fail')) + self.assertEqual(out, + f"testSkip ({classname}.testSkip) ... skipped 'skip'\n" + f'testSkip ({classname}.testSkip) ... FAIL\n') classDict = dict(unittest.TestResult.__dict__) @@ -734,7 +914,7 @@ def test_foo(self): expected_out = '\nStdout:\nset up\n' self.assertEqual(stdout.getvalue(), expected_out) self.assertEqual(len(result.errors), 1) - description = f'test_foo ({strclass(Foo)})' + description = f'test_foo ({strclass(Foo)}.test_foo)' test_case, formatted_exc = result.errors[0] self.assertEqual(str(test_case), description) self.assertIn('ZeroDivisionError: division by zero', formatted_exc) @@ -756,7 +936,7 @@ def test_foo(self): expected_out = '\nStdout:\ntear down\n' self.assertEqual(stdout.getvalue(), expected_out) self.assertEqual(len(result.errors), 1) - description = f'test_foo ({strclass(Foo)})' + description = f'test_foo ({strclass(Foo)}.test_foo)' test_case, formatted_exc = result.errors[0] self.assertEqual(str(test_case), description) self.assertIn('ZeroDivisionError: division by zero', formatted_exc) @@ -779,12 +959,13 @@ def test_foo(self): expected_out = '\nStdout:\nset up\ndo cleanup2\ndo cleanup1\n' self.assertEqual(stdout.getvalue(), expected_out) self.assertEqual(len(result.errors), 2) - description = f'test_foo ({strclass(Foo)})' + description = f'test_foo ({strclass(Foo)}.test_foo)' test_case, formatted_exc = result.errors[0] self.assertEqual(str(test_case), description) self.assertIn('ValueError: bad cleanup2', formatted_exc) self.assertNotIn('TypeError', formatted_exc) - self.assertIn(expected_out, formatted_exc) + self.assertIn('\nStdout:\nset up\ndo cleanup2\n', formatted_exc) + self.assertNotIn('\ndo cleanup1\n', formatted_exc) test_case, formatted_exc = result.errors[1] self.assertEqual(str(test_case), description) self.assertIn('TypeError: bad cleanup1', formatted_exc) @@ -809,19 +990,22 @@ def test_foo(self): expected_out = '\nStdout:\nset up\ndo cleanup2\ndo cleanup1\n' self.assertEqual(stdout.getvalue(), expected_out) self.assertEqual(len(result.errors), 3) - description = f'test_foo ({strclass(Foo)})' + description = f'test_foo ({strclass(Foo)}.test_foo)' test_case, formatted_exc = result.errors[0] self.assertEqual(str(test_case), description) self.assertIn('ZeroDivisionError: division by zero', formatted_exc) self.assertNotIn('ValueError', formatted_exc) self.assertNotIn('TypeError', formatted_exc) - self.assertIn(expected_out, formatted_exc) + self.assertIn('\nStdout:\nset up\n', formatted_exc) + self.assertNotIn('\ndo cleanup2\n', formatted_exc) + self.assertNotIn('\ndo cleanup1\n', formatted_exc) test_case, formatted_exc = result.errors[1] self.assertEqual(str(test_case), description) self.assertIn('ValueError: bad cleanup2', formatted_exc) self.assertNotIn('ZeroDivisionError', formatted_exc) self.assertNotIn('TypeError', formatted_exc) - self.assertIn(expected_out, formatted_exc) + self.assertIn('\nStdout:\nset up\ndo cleanup2\n', formatted_exc) + self.assertNotIn('\ndo cleanup1\n', formatted_exc) test_case, formatted_exc = result.errors[2] self.assertEqual(str(test_case), description) self.assertIn('TypeError: bad cleanup1', formatted_exc) @@ -849,19 +1033,22 @@ def test_foo(self): expected_out = '\nStdout:\nset up\ntear down\ndo cleanup2\ndo cleanup1\n' self.assertEqual(stdout.getvalue(), expected_out) self.assertEqual(len(result.errors), 3) - description = f'test_foo ({strclass(Foo)})' + description = f'test_foo ({strclass(Foo)}.test_foo)' test_case, formatted_exc = result.errors[0] self.assertEqual(str(test_case), description) self.assertIn('ZeroDivisionError: division by zero', formatted_exc) self.assertNotIn('ValueError', formatted_exc) self.assertNotIn('TypeError', formatted_exc) - self.assertIn(expected_out, formatted_exc) + self.assertIn('\nStdout:\nset up\ntear down\n', formatted_exc) + self.assertNotIn('\ndo cleanup2\n', formatted_exc) + self.assertNotIn('\ndo cleanup1\n', formatted_exc) test_case, formatted_exc = result.errors[1] self.assertEqual(str(test_case), description) self.assertIn('ValueError: bad cleanup2', formatted_exc) self.assertNotIn('ZeroDivisionError', formatted_exc) self.assertNotIn('TypeError', formatted_exc) - self.assertIn(expected_out, formatted_exc) + self.assertIn('\nStdout:\nset up\ntear down\ndo cleanup2\n', formatted_exc) + self.assertNotIn('\ndo cleanup1\n', formatted_exc) test_case, formatted_exc = result.errors[2] self.assertEqual(str(test_case), description) self.assertIn('TypeError: bad cleanup1', formatted_exc) diff --git a/Lib/unittest/test/test_runner.py b/Lib/unittest/test/test_runner.py index 6b2392f62e..7f24fccffb 100644 --- a/Lib/unittest/test/test_runner.py +++ b/Lib/unittest/test/test_runner.py @@ -3,6 +3,7 @@ import sys import pickle import subprocess +from test import support import unittest from unittest.case import _Outcome @@ -45,6 +46,29 @@ def cleanup(ordering, blowUp=False): raise Exception('CleanUpExc') +class TestCM: + def __init__(self, ordering, enter_result=None): + self.ordering = ordering + self.enter_result = enter_result + + def __enter__(self): + self.ordering.append('enter') + return self.enter_result + + def __exit__(self, *exc_info): + self.ordering.append('exit') + + +class LacksEnterAndExit: + pass +class LacksEnter: + def __exit__(self, *exc_info): + pass +class LacksExit: + def __enter__(self): + pass + + class TestCleanUp(unittest.TestCase): def testCleanUp(self): class TestableTest(unittest.TestCase): @@ -78,7 +102,8 @@ def testNothing(self): pass test = TestableTest('testNothing') - outcome = test._outcome = _Outcome() + result = unittest.TestResult() + outcome = test._outcome = _Outcome(result=result) CleanUpExc = Exception('foo') exc2 = Exception('bar') @@ -94,10 +119,13 @@ def cleanup2(): self.assertFalse(test.doCleanups()) self.assertFalse(outcome.success) - ((_, (Type1, instance1, _)), - (_, (Type2, instance2, _))) = reversed(outcome.errors) - self.assertEqual((Type1, instance1), (Exception, CleanUpExc)) - self.assertEqual((Type2, instance2), (Exception, exc2)) + (_, msg2), (_, msg1) = result.errors + self.assertIn('in cleanup1', msg1) + self.assertIn('raise CleanUpExc', msg1) + self.assertIn('Exception: foo', msg1) + self.assertIn('in cleanup2', msg2) + self.assertIn('raise exc2', msg2) + self.assertIn('Exception: bar', msg2) def testCleanupInRun(self): blowUp = False @@ -106,11 +134,13 @@ def testCleanupInRun(self): class TestableTest(unittest.TestCase): def setUp(self): ordering.append('setUp') + test.addCleanup(cleanup2) if blowUp: raise Exception('foo') def testNothing(self): ordering.append('test') + test.addCleanup(cleanup3) def tearDown(self): ordering.append('tearDown') @@ -121,8 +151,9 @@ def cleanup1(): ordering.append('cleanup1') def cleanup2(): ordering.append('cleanup2') + def cleanup3(): + ordering.append('cleanup3') test.addCleanup(cleanup1) - test.addCleanup(cleanup2) def success(some_test): self.assertEqual(some_test, test) @@ -132,7 +163,7 @@ def success(some_test): result.addSuccess = success test.run(result) - self.assertEqual(ordering, ['setUp', 'test', 'tearDown', + self.assertEqual(ordering, ['setUp', 'test', 'tearDown', 'cleanup3', 'cleanup2', 'cleanup1', 'success']) blowUp = True @@ -140,7 +171,7 @@ def success(some_test): test = TestableTest('testNothing') test.addCleanup(cleanup1) test.run(result) - self.assertEqual(ordering, ['setUp', 'cleanup1']) + self.assertEqual(ordering, ['setUp', 'cleanup2', 'cleanup1']) def testTestCaseDebugExecutesCleanups(self): ordering = [] @@ -152,9 +183,11 @@ def setUp(self): def testNothing(self): ordering.append('test') + self.addCleanup(cleanup3) def tearDown(self): ordering.append('tearDown') + test.addCleanup(cleanup4) test = TestableTest('testNothing') @@ -163,9 +196,47 @@ def cleanup1(): test.addCleanup(cleanup2) def cleanup2(): ordering.append('cleanup2') + def cleanup3(): + ordering.append('cleanup3') + def cleanup4(): + ordering.append('cleanup4') test.debug() - self.assertEqual(ordering, ['setUp', 'test', 'tearDown', 'cleanup1', 'cleanup2']) + self.assertEqual(ordering, ['setUp', 'test', 'tearDown', 'cleanup4', + 'cleanup3', 'cleanup1', 'cleanup2']) + + + def test_enterContext(self): + class TestableTest(unittest.TestCase): + def testNothing(self): + pass + + test = TestableTest('testNothing') + cleanups = [] + + test.addCleanup(cleanups.append, 'cleanup1') + cm = TestCM(cleanups, 42) + self.assertEqual(test.enterContext(cm), 42) + test.addCleanup(cleanups.append, 'cleanup2') + + self.assertTrue(test.doCleanups()) + self.assertEqual(cleanups, ['enter', 'cleanup2', 'exit', 'cleanup1']) + + def test_enterContext_arg_errors(self): + class TestableTest(unittest.TestCase): + def testNothing(self): + pass + + test = TestableTest('testNothing') + + with self.assertRaisesRegex(TypeError, 'the context manager'): + test.enterContext(LacksEnterAndExit()) + with self.assertRaisesRegex(TypeError, 'the context manager'): + test.enterContext(LacksEnter()) + with self.assertRaisesRegex(TypeError, 'the context manager'): + test.enterContext(LacksExit()) + + self.assertEqual(test._cleanups, []) class TestClassCleanup(unittest.TestCase): @@ -291,13 +362,14 @@ def testNothing(self): ordering.append('test') @classmethod def tearDownClass(cls): + ordering.append('tearDownClass') raise Exception('TearDownClassExc') suite = unittest.defaultTestLoader.loadTestsFromTestCase(TestableTest) with self.assertRaises(Exception) as cm: suite.debug() self.assertEqual(str(cm.exception), 'TearDownClassExc') - self.assertEqual(ordering, ['setUpClass', 'test']) + self.assertEqual(ordering, ['setUpClass', 'test', 'tearDownClass']) self.assertTrue(TestableTest._class_cleanups) TestableTest._class_cleanups.clear() @@ -307,7 +379,7 @@ def tearDownClass(cls): with self.assertRaises(Exception) as cm: suite.debug() self.assertEqual(str(cm.exception), 'TearDownClassExc') - self.assertEqual(ordering, ['setUpClass', 'test']) + self.assertEqual(ordering, ['setUpClass', 'test', 'tearDownClass']) self.assertTrue(TestableTest._class_cleanups) TestableTest._class_cleanups.clear() @@ -446,6 +518,62 @@ def tearDownClass(cls): self.assertEqual(ordering, ['setUpClass', 'test', 'tearDownClass', 'cleanup_good']) + def test_enterClassContext(self): + class TestableTest(unittest.TestCase): + def testNothing(self): + pass + + cleanups = [] + + TestableTest.addClassCleanup(cleanups.append, 'cleanup1') + cm = TestCM(cleanups, 42) + self.assertEqual(TestableTest.enterClassContext(cm), 42) + TestableTest.addClassCleanup(cleanups.append, 'cleanup2') + + TestableTest.doClassCleanups() + self.assertEqual(cleanups, ['enter', 'cleanup2', 'exit', 'cleanup1']) + + def test_enterClassContext_arg_errors(self): + class TestableTest(unittest.TestCase): + def testNothing(self): + pass + + with self.assertRaisesRegex(TypeError, 'the context manager'): + TestableTest.enterClassContext(LacksEnterAndExit()) + with self.assertRaisesRegex(TypeError, 'the context manager'): + TestableTest.enterClassContext(LacksEnter()) + with self.assertRaisesRegex(TypeError, 'the context manager'): + TestableTest.enterClassContext(LacksExit()) + + self.assertEqual(TestableTest._class_cleanups, []) + + def test_run_nested_test(self): + ordering = [] + + class InnerTest(unittest.TestCase): + @classmethod + def setUpClass(cls): + ordering.append('inner setup') + cls.addClassCleanup(ordering.append, 'inner cleanup') + def test(self): + ordering.append('inner test') + + class OuterTest(unittest.TestCase): + @classmethod + def setUpClass(cls): + ordering.append('outer setup') + cls.addClassCleanup(ordering.append, 'outer cleanup') + def test(self): + ordering.append('start outer test') + runTests(InnerTest) + ordering.append('end outer test') + + runTests(OuterTest) + self.assertEqual(ordering, [ + 'outer setup', 'start outer test', + 'inner setup', 'inner test', 'inner cleanup', + 'end outer test', 'outer cleanup']) + class TestModuleCleanUp(unittest.TestCase): def test_add_and_do_ModuleCleanup(self): @@ -657,6 +785,7 @@ def setUpModule(): unittest.addModuleCleanup(cleanup, ordering) @staticmethod def tearDownModule(): + ordering.append('tearDownModule') raise Exception('CleanUpExc') class TestableTest(unittest.TestCase): @@ -675,7 +804,8 @@ def tearDownClass(cls): self.assertEqual(result.errors[0][1].splitlines()[-1], 'Exception: CleanUpExc') self.assertEqual(ordering, ['setUpModule', 'setUpClass', 'test', - 'tearDownClass', 'cleanup_good']) + 'tearDownClass', 'tearDownModule', + 'cleanup_good']) self.assertEqual(unittest.case._module_cleanups, []) def test_debug_module_executes_cleanUp(self): @@ -729,6 +859,7 @@ def setUpModule(): unittest.addModuleCleanup(cleanup, ordering, blowUp=blowUp) @staticmethod def tearDownModule(): + ordering.append('tearDownModule') raise Exception('TearDownModuleExc') class TestableTest(unittest.TestCase): @@ -748,7 +879,7 @@ def tearDownClass(cls): suite.debug() self.assertEqual(str(cm.exception), 'TearDownModuleExc') self.assertEqual(ordering, ['setUpModule', 'setUpClass', 'test', - 'tearDownClass']) + 'tearDownClass', 'tearDownModule']) self.assertTrue(unittest.case._module_cleanups) unittest.case._module_cleanups.clear() @@ -759,7 +890,7 @@ def tearDownClass(cls): suite.debug() self.assertEqual(str(cm.exception), 'TearDownModuleExc') self.assertEqual(ordering, ['setUpModule', 'setUpClass', 'test', - 'tearDownClass']) + 'tearDownClass', 'tearDownModule']) self.assertTrue(unittest.case._module_cleanups) unittest.case._module_cleanups.clear() @@ -995,6 +1126,31 @@ def tearDown(self): 'cleanup2', 'setUp2', 'test2', 'tearDown2', 'cleanup3', 'tearDownModule', 'cleanup1']) + def test_enterModuleContext(self): + cleanups = [] + + unittest.addModuleCleanup(cleanups.append, 'cleanup1') + cm = TestCM(cleanups, 42) + self.assertEqual(unittest.enterModuleContext(cm), 42) + unittest.addModuleCleanup(cleanups.append, 'cleanup2') + + unittest.case.doModuleCleanups() + self.assertEqual(cleanups, ['enter', 'cleanup2', 'exit', 'cleanup1']) + + def test_enterModuleContext_arg_errors(self): + class TestableTest(unittest.TestCase): + def testNothing(self): + pass + + with self.assertRaisesRegex(TypeError, 'the context manager'): + unittest.enterModuleContext(LacksEnterAndExit()) + with self.assertRaisesRegex(TypeError, 'the context manager'): + unittest.enterModuleContext(LacksEnter()) + with self.assertRaisesRegex(TypeError, 'the context manager'): + unittest.enterModuleContext(LacksExit()) + + self.assertEqual(unittest.case._module_cleanups, []) + class Test_TextTestRunner(unittest.TestCase): """Tests for TextTestRunner.""" @@ -1139,6 +1295,7 @@ def MockResultClass(*args): # TODO: RUSTPYTHON @unittest.expectedFailure + @support.requires_subprocess() def test_warnings(self): """ Check that warnings argument of TextTestRunner correctly affects the diff --git a/Lib/unittest/test/test_skipping.py b/Lib/unittest/test/test_skipping.py index 7cb9d33f5e..64ceeae37e 100644 --- a/Lib/unittest/test/test_skipping.py +++ b/Lib/unittest/test/test_skipping.py @@ -197,7 +197,7 @@ def tearDown(self): result = LoggingResult(events) test = Foo("test_skip_me") self.assertIs(test.run(result), result) - self.assertEqual(events, ['startTest', 'addSkip', 'addFailure', 'stopTest']) + self.assertEqual(events, ['startTest', 'addFailure', 'addSkip', 'stopTest']) self.assertEqual(result.skipped, [(test, "skip")]) def test_skipping_and_fail_in_cleanup(self): diff --git a/Lib/unittest/test/test_suite.py b/Lib/unittest/test/test_suite.py index bc2ce1e1c2..0551a16996 100644 --- a/Lib/unittest/test/test_suite.py +++ b/Lib/unittest/test/test_suite.py @@ -370,8 +370,7 @@ def test_nothing(self): del test # for the benefit of non-reference counting implementations - # XXX RUSTPYTHON TODO: gc module - # gc.collect() + gc.collect() self.assertEqual(suite._tests, [None]) self.assertIsNone(wref()) diff --git a/Lib/unittest/test/testmock/testasync.py b/Lib/unittest/test/testmock/testasync.py index bcf634bdd3..90ea72d82b 100644 --- a/Lib/unittest/test/testmock/testasync.py +++ b/Lib/unittest/test/testmock/testasync.py @@ -4,11 +4,14 @@ import re import unittest from contextlib import contextmanager +from test import support + +support.requires_working_socket(module=True) from asyncio import run, iscoroutinefunction from unittest import IsolatedAsyncioTestCase from unittest.mock import (ANY, call, AsyncMock, patch, MagicMock, Mock, - create_autospec, sentinel, _CallList) + create_autospec, sentinel, _CallList, seal) def tearDownModule(): @@ -146,6 +149,23 @@ async def test_async(): run(test_async()) + def test_patch_dict_async_def(self): + foo = {'a': 'a'} + @patch.dict(foo, {'a': 'b'}) + async def test_async(): + self.assertEqual(foo['a'], 'b') + + self.assertTrue(iscoroutinefunction(test_async)) + run(test_async()) + + def test_patch_dict_async_def_context(self): + foo = {'a': 'a'} + async def test_async(): + with patch.dict(foo, {'a': 'b'}): + self.assertEqual(foo['a'], 'b') + + run(test_async()) + class AsyncMockTest(unittest.TestCase): def test_iscoroutinefunction_default(self): @@ -173,8 +193,7 @@ def foo(): pass def test_future_isfuture(self): loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - fut = asyncio.Future() + fut = loop.create_future() loop.stop() loop.close() mock = AsyncMock(fut) @@ -281,6 +300,14 @@ def test_spec_normal_methods_on_class_with_mock(self): self.assertIsInstance(mock.async_method, AsyncMock) self.assertIsInstance(mock.normal_method, Mock) + def test_spec_normal_methods_on_class_with_mock_seal(self): + mock = Mock(AsyncClass) + seal(mock) + with self.assertRaises(AttributeError): + mock.normal_method + with self.assertRaises(AttributeError): + mock.async_method + def test_spec_mock_type_kw(self): def inner_test(mock_type): async_mock = mock_type(spec=async_func) @@ -699,7 +726,6 @@ def inner_test(mock_type): with self.subTest(f"test aiter and anext corourtine with {mock_type}"): inner_test(mock_type) - # TODO: RUSTPYTHON; async for """ def test_mock_async_for(self): @@ -1062,3 +1088,7 @@ async def f(x=None): pass 'Actual: [call(1)]'))) as cm: self.mock.assert_has_awaits([call(), call(1, 2)]) self.assertIsInstance(cm.exception.__cause__, TypeError) + + +if __name__ == '__main__': + unittest.main() diff --git a/Lib/unittest/test/testmock/testmock.py b/Lib/unittest/test/testmock/testmock.py index fdba543b53..eaae22e854 100644 --- a/Lib/unittest/test/testmock/testmock.py +++ b/Lib/unittest/test/testmock/testmock.py @@ -226,6 +226,14 @@ class B(object): with self.assertRaisesRegex(InvalidSpecError, "Cannot spec attr 'B' as the spec_set "): mock.patch.object(A, 'B', spec_set=A.B).start() + with self.assertRaisesRegex(InvalidSpecError, + "Cannot spec attr 'B' as the spec_set "): + mock.patch.object(A, 'B', spec_set=A.B).start() + with self.assertRaisesRegex(InvalidSpecError, "Cannot spec a Mock object."): + mock.Mock(A.B) + with mock.patch('builtins.open', mock.mock_open()): + mock.mock_open() # should still be valid with open() mocked + def test_reset_mock(self): parent = Mock() @@ -1644,6 +1652,22 @@ def test_mock_unsafe(self): m.aseert_foo_call() m.assrt_foo_call() + # gh-100739 + def test_mock_safe_with_spec(self): + class Foo(object): + def assert_bar(self): + pass + + def assertSome(self): + pass + + m = Mock(spec=Foo) + m.assert_bar() + m.assertSome() + + m.assert_bar.assert_called_once() + m.assertSome.assert_called_once() + #Issue21262 def test_assert_not_called(self): m = Mock() diff --git a/Lib/unittest/test/testmock/testwith.py b/Lib/unittest/test/testmock/testwith.py index 42ebf3898c..c74d49a63c 100644 --- a/Lib/unittest/test/testmock/testwith.py +++ b/Lib/unittest/test/testmock/testwith.py @@ -130,8 +130,8 @@ def f(self): pass c = C() - with patch.object(c, 'f', autospec=True) as patch1: - with patch.object(c, 'f', autospec=True) as patch2: + with patch.object(c, 'f') as patch1: + with patch.object(c, 'f') as patch2: c.f() self.assertEqual(patch2.call_count, 1) self.assertEqual(patch1.call_count, 0) From 4a357ee6e9aca99264dd28b460dfc4d801a34b2a Mon Sep 17 00:00:00 2001 From: Jeong YunWon Date: Fri, 24 Feb 2023 14:29:45 +0900 Subject: [PATCH 2/2] adjust successful/failing tests from unittest/test --- Lib/unittest/test/test_async_case.py | 3 ++- Lib/unittest/test/test_runner.py | 2 -- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/Lib/unittest/test/test_async_case.py b/Lib/unittest/test/test_async_case.py index 6889c39127..98ec4f3345 100644 --- a/Lib/unittest/test/test_async_case.py +++ b/Lib/unittest/test/test_async_case.py @@ -37,7 +37,8 @@ async def __aenter__(self): pass -VAR = contextvars.ContextVar('VAR', default=()) +# TODO: RUSTPYTHON; used by following test suite +# VAR = contextvars.ContextVar('VAR', default=()) @unittest.skip("TODO: RUSTPYTHON; requires sys.get_coroutine_origin_tracking_depth()") diff --git a/Lib/unittest/test/test_runner.py b/Lib/unittest/test/test_runner.py index 7f24fccffb..50d9ed2fd3 100644 --- a/Lib/unittest/test/test_runner.py +++ b/Lib/unittest/test/test_runner.py @@ -1293,8 +1293,6 @@ def MockResultClass(*args): expectedresult = (runner.stream, DESCRIPTIONS, VERBOSITY) self.assertEqual(runner._makeResult(), expectedresult) - # TODO: RUSTPYTHON - @unittest.expectedFailure @support.requires_subprocess() def test_warnings(self): """