diff --git a/Lib/os.py b/Lib/os.py index 7ee7d695d9..b4c9f84c36 100644 --- a/Lib/os.py +++ b/Lib/os.py @@ -110,6 +110,7 @@ def _add(str, fn): _add("HAVE_FCHMODAT", "chmod") _add("HAVE_FCHOWNAT", "chown") _add("HAVE_FSTATAT", "stat") + _add("HAVE_LSTAT", "lstat") _add("HAVE_FUTIMESAT", "utime") _add("HAVE_LINKAT", "link") _add("HAVE_MKDIRAT", "mkdir") @@ -131,6 +132,7 @@ def _add(str, fn): _set = set() _add("HAVE_FCHDIR", "chdir") _add("HAVE_FCHMOD", "chmod") + _add("MS_WINDOWS", "chmod") _add("HAVE_FCHOWN", "chown") _add("HAVE_FDOPENDIR", "listdir") _add("HAVE_FDOPENDIR", "scandir") @@ -171,6 +173,7 @@ def _add(str, fn): _add("HAVE_FSTATAT", "stat") _add("HAVE_LCHFLAGS", "chflags") _add("HAVE_LCHMOD", "chmod") + _add("MS_WINDOWS", "chmod") if _exists("lchown"): # mac os x10.3 _add("HAVE_LCHOWN", "chown") _add("HAVE_LINKAT", "link") @@ -279,6 +282,10 @@ def renames(old, new): __all__.extend(["makedirs", "removedirs", "renames"]) +# Private sentinel that makes walk() classify all symlinks and junctions as +# regular files. +_walk_symlinks_as_files = object() + def walk(top, topdown=True, onerror=None, followlinks=False): """Directory tree generator. @@ -331,12 +338,12 @@ def walk(top, topdown=True, onerror=None, followlinks=False): import os from os.path import join, getsize - for root, dirs, files in os.walk('python/Lib/email'): + for root, dirs, files in os.walk('python/Lib/xml'): print(root, "consumes ") print(sum(getsize(join(root, name)) for name in files), end=" ") print("bytes in", len(files), "non-directory files") - if 'CVS' in dirs: - dirs.remove('CVS') # don't visit CVS directories + if '__pycache__' in dirs: + dirs.remove('__pycache__') # don't visit __pycache__ directories """ sys.audit("os.walk", top, topdown, onerror, followlinks) @@ -380,7 +387,10 @@ def walk(top, topdown=True, onerror=None, followlinks=False): break try: - is_dir = entry.is_dir() + if followlinks is _walk_symlinks_as_files: + is_dir = entry.is_dir(follow_symlinks=False) and not entry.is_junction() + else: + is_dir = entry.is_dir() except OSError: # If is_dir() raises an OSError, consider the entry not to # be a directory, same behaviour as os.path.isdir(). @@ -459,34 +469,69 @@ def fwalk(top=".", topdown=True, onerror=None, *, follow_symlinks=False, dir_fd= Example: import os - for root, dirs, files, rootfd in os.fwalk('python/Lib/email'): + for root, dirs, files, rootfd in os.fwalk('python/Lib/xml'): print(root, "consumes", end="") print(sum(os.stat(name, dir_fd=rootfd).st_size for name in files), end="") print("bytes in", len(files), "non-directory files") - if 'CVS' in dirs: - dirs.remove('CVS') # don't visit CVS directories + if '__pycache__' in dirs: + dirs.remove('__pycache__') # don't visit __pycache__ directories """ sys.audit("os.fwalk", top, topdown, onerror, follow_symlinks, dir_fd) top = fspath(top) - # Note: To guard against symlink races, we use the standard - # lstat()/open()/fstat() trick. - if not follow_symlinks: - orig_st = stat(top, follow_symlinks=False, dir_fd=dir_fd) - topfd = open(top, O_RDONLY | O_NONBLOCK, dir_fd=dir_fd) + stack = [(_fwalk_walk, (True, dir_fd, top, top, None))] + isbytes = isinstance(top, bytes) try: - if (follow_symlinks or (st.S_ISDIR(orig_st.st_mode) and - path.samestat(orig_st, stat(topfd)))): - yield from _fwalk(topfd, top, isinstance(top, bytes), - topdown, onerror, follow_symlinks) + while stack: + yield from _fwalk(stack, isbytes, topdown, onerror, follow_symlinks) finally: - close(topfd) - - def _fwalk(topfd, toppath, isbytes, topdown, onerror, follow_symlinks): + # Close any file descriptors still on the stack. + while stack: + action, value = stack.pop() + if action == _fwalk_close: + close(value) + + # Each item in the _fwalk() stack is a pair (action, args). + _fwalk_walk = 0 # args: (isroot, dirfd, toppath, topname, entry) + _fwalk_yield = 1 # args: (toppath, dirnames, filenames, topfd) + _fwalk_close = 2 # args: dirfd + + def _fwalk(stack, isbytes, topdown, onerror, follow_symlinks): # Note: This uses O(depth of the directory tree) file descriptors: if # necessary, it can be adapted to only require O(1) FDs, see issue # #13734. + action, value = stack.pop() + if action == _fwalk_close: + close(value) + return + elif action == _fwalk_yield: + yield value + return + assert action == _fwalk_walk + isroot, dirfd, toppath, topname, entry = value + try: + if not follow_symlinks: + # Note: To guard against symlink races, we use the standard + # lstat()/open()/fstat() trick. + if entry is None: + orig_st = stat(topname, follow_symlinks=False, dir_fd=dirfd) + else: + orig_st = entry.stat(follow_symlinks=False) + topfd = open(topname, O_RDONLY | O_NONBLOCK, dir_fd=dirfd) + except OSError as err: + if isroot: + raise + if onerror is not None: + onerror(err) + return + stack.append((_fwalk_close, topfd)) + if not follow_symlinks: + if isroot and not st.S_ISDIR(orig_st.st_mode): + return + if not path.samestat(orig_st, stat(topfd)): + return + scandir_it = scandir(topfd) dirs = [] nondirs = [] @@ -512,31 +557,18 @@ def _fwalk(topfd, toppath, isbytes, topdown, onerror, follow_symlinks): if topdown: yield toppath, dirs, nondirs, topfd + else: + stack.append((_fwalk_yield, (toppath, dirs, nondirs, topfd))) - for name in dirs if entries is None else zip(dirs, entries): - try: - if not follow_symlinks: - if topdown: - orig_st = stat(name, dir_fd=topfd, follow_symlinks=False) - else: - assert entries is not None - name, entry = name - orig_st = entry.stat(follow_symlinks=False) - dirfd = open(name, O_RDONLY | O_NONBLOCK, dir_fd=topfd) - except OSError as err: - if onerror is not None: - onerror(err) - continue - try: - if follow_symlinks or path.samestat(orig_st, stat(dirfd)): - dirpath = path.join(toppath, name) - yield from _fwalk(dirfd, dirpath, isbytes, - topdown, onerror, follow_symlinks) - finally: - close(dirfd) - - if not topdown: - yield toppath, dirs, nondirs, topfd + toppath = path.join(toppath, toppath[:0]) # Add trailing slash. + if entries is None: + stack.extend( + (_fwalk_walk, (False, topfd, toppath + name, name, None)) + for name in dirs[::-1]) + else: + stack.extend( + (_fwalk_walk, (False, topfd, toppath + name, name, entry)) + for name, entry in zip(dirs[::-1], entries[::-1])) __all__.append("fwalk") @@ -1061,6 +1093,12 @@ def _fspath(path): else: raise TypeError("expected str, bytes or os.PathLike object, " "not " + path_type.__name__) + except TypeError: + if path_type.__fspath__ is None: + raise TypeError("expected str, bytes or os.PathLike object, " + "not " + path_type.__name__) from None + else: + raise if isinstance(path_repr, (str, bytes)): return path_repr else: @@ -1079,6 +1117,8 @@ class PathLike(abc.ABC): """Abstract base class for implementing the file system path protocol.""" + __slots__ = () + @abc.abstractmethod def __fspath__(self): """Return the file system path representation of the object.""" @@ -1128,3 +1168,17 @@ def add_dll_directory(path): cookie, nt._remove_dll_directory ) + + +if _exists('sched_getaffinity') and sys._get_cpu_count_config() < 0: + def process_cpu_count(): + """ + Get the number of CPUs of the current process. + + Return the number of logical CPUs usable by the calling thread of the + current process. Return None if indeterminable. + """ + return len(sched_getaffinity(0)) +else: + # Just an alias to cpu_count() (same docstring) + process_cpu_count = cpu_count diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py index 3b19d3d3cd..a025a2d4ff 100644 --- a/Lib/test/test_os.py +++ b/Lib/test/test_os.py @@ -13,7 +13,9 @@ import locale import os import pickle +import platform import select +import selectors import shutil import signal import socket @@ -33,7 +35,7 @@ from test.support import import_helper from test.support import os_helper from test.support import socket_helper -from test.support import set_recursion_limit +from test.support import infinite_recursion from test.support import warnings_helper from platform import win32_is_iot @@ -55,8 +57,10 @@ except (ImportError, AttributeError): all_users = [] try: + import _testcapi from _testcapi import INT_MAX, PY_SSIZE_T_MAX except ImportError: + _testcapi = None INT_MAX = PY_SSIZE_T_MAX = sys.maxsize try: @@ -184,6 +188,7 @@ def test_access(self): os.close(f) self.assertTrue(os.access(os_helper.TESTFN, os.W_OK)) + # TODO: RUSTPYTHON @unittest.skipIf(sys.platform == "win32", "TODO: RUSTPYTHON, BrokenPipeError: (32, 'The process cannot access the file because it is being used by another process. (os error 32)')") @unittest.skipIf( support.is_emscripten, "Test is unstable under Emscripten." @@ -711,6 +716,7 @@ def check_file_attributes(self, result): self.assertTrue(isinstance(result.st_file_attributes, int)) self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF) + # TODO: RUSTPYTHON @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON os.stat return value doesnt have st_file_attributes attribute") @unittest.skipUnless(sys.platform == "win32", "st_file_attributes is Win32 specific") @@ -733,6 +739,7 @@ def test_file_attributes(self): result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY, stat.FILE_ATTRIBUTE_DIRECTORY) + # TODO: RUSTPYTHON @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON os.stat (PermissionError: [Errno 5] Access is denied.)") @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") def test_access_denied(self): @@ -756,6 +763,7 @@ def test_access_denied(self): self.assertNotEqual(result.st_size, 0) self.assertTrue(os.path.isfile(fname)) + # TODO: RUSTPYTHON @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON os.stat (PermissionError: [Errno 1] Incorrect function.)") @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") def test_stat_block_device(self): @@ -814,6 +822,7 @@ def _test_utime(self, set_time, filename=None): self.assertEqual(st.st_atime_ns, atime_ns) self.assertEqual(st.st_mtime_ns, mtime_ns) + # TODO: RUSTPYTHON @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON (AssertionError: 2.002003 != 1.002003 within 1e-06 delta (1.0000000000000002 difference))") def test_utime(self): def set_time(filename, ns): @@ -826,12 +835,12 @@ def ns_to_sec(ns): # Convert a number of nanosecond (int) to a number of seconds (float). # Round towards infinity by adding 0.5 nanosecond to avoid rounding # issue, os.utime() rounds towards minus infinity. - # XXX: RUSTCPYTHON os.utime() use `[Duration::from_secs_f64](https://doc.rust-lang.org/std/time/struct.Duration.html#method.try_from_secs_f64)` + # XXX: RUSTPYTHON os.utime() use `[Duration::from_secs_f64](https://doc.rust-lang.org/std/time/struct.Duration.html#method.try_from_secs_f64)` # return (ns * 1e-9) + 0.5e-9 return (ns * 1e-9) def test_utime_by_indexed(self): - # pass times as floating point seconds as the second indexed parameter + # pass times as floating-point seconds as the second indexed parameter def set_time(filename, ns): atime_ns, mtime_ns = ns atime = self.ns_to_sec(atime_ns) @@ -880,6 +889,7 @@ def set_time(filename, ns): os.utime(name, dir_fd=dirfd, ns=ns) self._test_utime(set_time) + # TODO: RUSTPYTHON @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON (AssertionError: 2.002003 != 1.002003 within 1e-06 delta (1.0000000000000002 difference))") def test_utime_directory(self): def set_time(filename, ns): @@ -909,6 +919,7 @@ def _test_utime_current(self, set_time): self.assertAlmostEqual(st.st_mtime, current, delta=delta, msg=msg) + # TODO: RUSTPYTHON @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON (AssertionError: 3359485824.516508 != 1679742912.516503 within 0.05 delta (1679742912.000005 difference) : st_time=3359485824.516508, current=1679742912.516503, dt=1679742912.000005)") def test_utime_current(self): def set_time(filename): @@ -916,6 +927,7 @@ def set_time(filename): os.utime(self.fname) self._test_utime_current(set_time) + # TODO: RUSTPYTHON @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON (AssertionError: 3359485824.5186944 != 1679742912.5186892 within 0.05 delta (1679742912.0000052 difference) : st_time=3359485824.5186944, current=1679742912.5186892, dt=1679742912.0000052)") def test_utime_current_old(self): def set_time(filename): @@ -923,6 +935,15 @@ def set_time(filename): os.utime(self.fname, None) self._test_utime_current(set_time) + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_utime_nonexistent(self): + now = time.time() + filename = 'nonexistent' + with self.assertRaises(FileNotFoundError) as cm: + os.utime(filename, (now, now)) + self.assertEqual(cm.exception.filename, filename) + def get_file_system(self, path): if sys.platform == 'win32': root = os.path.splitdrive(os.path.abspath(path))[0] + '\\' @@ -936,6 +957,7 @@ def get_file_system(self, path): return buf.value # return None if the filesystem is unknown + # TODO: RUSTPYTHON @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON (ModuleNotFoundError: No module named '_ctypes')") def test_large_time(self): # Many filesystems are limited to the year 2038. At least, the test @@ -1149,6 +1171,7 @@ def test_putenv_unsetenv(self): stdout=subprocess.PIPE, text=True) self.assertEqual(proc.stdout.rstrip(), repr(None)) + # TODO: RUSTPYTHON @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON (AssertionError: ValueError not raised by putenv)") # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415). @support.requires_mac_ver(10, 6) @@ -1574,7 +1597,7 @@ def test_walk_many_open_files(self): def test_walk_above_recursion_limit(self): depth = 50 os.makedirs(os.path.join(self.walk_path, *(['d'] * depth))) - with set_recursion_limit(depth - 5): + with infinite_recursion(depth - 5): all = list(self.walk(self.walk_path)) sub2_path = self.sub2_tree[0] @@ -1671,6 +1694,9 @@ def test_yields_correct_dir_fd(self): @unittest.skipIf( support.is_emscripten, "Cannot dup stdout on Emscripten" ) + @unittest.skipIf( + support.is_android, "dup return value is unpredictable on Android" + ) def test_fd_leak(self): # Since we're opening a lot of FDs, we must be careful to avoid leaks: # we both check that calling fwalk() a large number of times doesn't @@ -1684,10 +1710,29 @@ def test_fd_leak(self): self.addCleanup(os.close, newfd) self.assertEqual(newfd, minfd) + @unittest.skipIf( + support.is_emscripten, "Cannot dup stdout on Emscripten" + ) + @unittest.skipIf( + support.is_android, "dup return value is unpredictable on Android" + ) + def test_fd_finalization(self): + # Check that close()ing the fwalk() generator closes FDs + def getfd(): + fd = os.dup(1) + os.close(fd) + return fd + for topdown in (False, True): + old_fd = getfd() + it = self.fwalk(os_helper.TESTFN, topdown=topdown) + self.assertEqual(getfd(), old_fd) + next(it) + self.assertGreater(getfd(), old_fd) + it.close() + self.assertEqual(getfd(), old_fd) + # fwalk() keeps file descriptors open test_walk_many_open_files = None - # fwalk() still uses recursion - test_walk_above_recursion_limit = None class BytesWalkTests(WalkTests): @@ -1703,37 +1748,32 @@ def walk(self, top, **kwargs): bdirs[:] = list(map(os.fsencode, dirs)) bfiles[:] = list(map(os.fsencode, files)) -@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()") -class BytesFwalkTests(FwalkTests): - """Tests for os.walk() with bytes.""" - def fwalk(self, top='.', *args, **kwargs): - for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs): - root = os.fsdecode(broot) - dirs = list(map(os.fsdecode, bdirs)) - files = list(map(os.fsdecode, bfiles)) - yield (root, dirs, files, topfd) - bdirs[:] = list(map(os.fsencode, dirs)) - bfiles[:] = list(map(os.fsencode, files)) - # TODO: RUSTPYTHON (TypeError: Can't mix strings and bytes in path components) @unittest.expectedFailure def test_compare_to_walk(self): - super().test_compare_to_walk() + return super().test_compare_to_walk() # TODO: RUSTPYTHON (TypeError: Can't mix strings and bytes in path components) @unittest.expectedFailure def test_dir_fd(self): - super().test_dir_fd() + return super().test_dir_fd() # TODO: RUSTPYTHON (TypeError: Can't mix strings and bytes in path components) @unittest.expectedFailure def test_yields_correct_dir_fd(self): - super().test_yields_correct_dir_fd() + return super().test_yields_correct_dir_fd() - # TODO: RUSTPYTHON (TypeError: Can't mix strings and bytes in path components) - @unittest.expectedFailure - def test_walk_bottom_up(self): - super().test_walk_bottom_up() +@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()") +class BytesFwalkTests(FwalkTests): + """Tests for os.walk() with bytes.""" + def fwalk(self, top='.', *args, **kwargs): + for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs): + root = os.fsdecode(broot) + dirs = list(map(os.fsdecode, bdirs)) + files = list(map(os.fsdecode, bfiles)) + yield (root, dirs, files, topfd) + bdirs[:] = list(map(os.fsencode, dirs)) + bfiles[:] = list(map(os.fsencode, files)) class MakedirTests(unittest.TestCase): @@ -1771,6 +1811,7 @@ def test_mode(self): self.assertEqual(os.stat(path).st_mode & 0o777, 0o555) self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775) + # TODO: RUSTPYTHON @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON os.umask not implemented yet for all platforms") @unittest.skipIf( support.is_emscripten or support.is_wasi, @@ -1790,6 +1831,7 @@ def test_exist_ok_existing_directory(self): # Issue #25583: A drive root could raise PermissionError on Windows os.makedirs(os.path.abspath('/'), exist_ok=True) + # TODO: RUSTPYTHON @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON os.umask not implemented yet for all platforms") @unittest.skipIf( support.is_emscripten or support.is_wasi, @@ -1832,6 +1874,19 @@ def test_exist_ok_existing_regular_file(self): self.assertRaises(OSError, os.makedirs, path, exist_ok=True) os.remove(path) + @unittest.skipUnless(os.name == 'nt', "requires Windows") + def test_win32_mkdir_700(self): + base = os_helper.TESTFN + path = os.path.abspath(os.path.join(os_helper.TESTFN, 'dir')) + os.mkdir(path, mode=0o700) + out = subprocess.check_output(["cacls.exe", path, "/s"], encoding="oem") + os.rmdir(path) + out = out.strip().rsplit(" ", 1)[1] + self.assertEqual( + out, + '"D:P(A;OICI;FA;;;SY)(A;OICI;FA;;;BA)(A;OICI;FA;;;OW)"', + ) + def tearDown(self): path = os.path.join(os_helper.TESTFN, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', 'dir6') @@ -2064,6 +2119,7 @@ def test_urandom_failure(self): """ assert_python_ok('-c', code) + # TODO: RUSTPYTHON @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON on Windows (ModuleNotFoundError: No module named 'os')") def test_urandom_fd_closed(self): # Issue #21207: urandom() should reopen its fd to /dev/urandom if @@ -2079,6 +2135,7 @@ def test_urandom_fd_closed(self): """ rc, out, err = assert_python_ok('-Sc', code) + # TODO: RUSTPYTHON @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON (ModuleNotFoundError: No module named 'os'") def test_urandom_fd_reopened(self): # Issue #21207: urandom() should detect its fd to /dev/urandom @@ -2167,6 +2224,7 @@ def test_execv_with_bad_arglist(self): self.assertRaises(ValueError, os.execv, 'notepad', ('',)) self.assertRaises(ValueError, os.execv, 'notepad', ['']) + # TODO: RUSTPYTHON @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON os.execve not implemented yet for all platforms") def test_execvpe_with_bad_arglist(self): self.assertRaises(ValueError, os.execvpe, 'notepad', [], None) @@ -2227,6 +2285,7 @@ def test_internal_execvpe_str(self): if os.name != "nt": self._test_internal_execvpe(bytes) + # TODO: RUSTPYTHON @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON os.execve not implemented yet for all platforms") def test_execve_invalid_env(self): args = [sys.executable, '-c', 'pass'] @@ -2249,6 +2308,7 @@ def test_execve_invalid_env(self): with self.assertRaises(ValueError): os.execve(args[0], args, newenv) + # TODO: RUSTPYTHON @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON os.execve not implemented yet for all platforms") @unittest.skipUnless(sys.platform == "win32", "Win32-specific test") def test_execve_with_empty_path(self): @@ -2301,12 +2361,15 @@ def test_chmod(self): class TestInvalidFD(unittest.TestCase): singles = ["fchdir", "dup", "fdatasync", "fstat", "fstatvfs", "fsync", "tcgetpgrp", "ttyname"] + singles_fildes = {"fchdir", "fdatasync", "fsync"} #singles.append("close") #We omit close because it doesn't raise an exception on some platforms def get_single(f): def helper(self): if hasattr(os, f): self.check(getattr(os, f)) + if f in self.singles_fildes: + self.check_bool(getattr(os, f)) return helper for f in singles: locals()["test_"+f] = get_single(f) @@ -2320,9 +2383,18 @@ def check(self, f, *args, **kwargs): self.fail("%r didn't raise an OSError with a bad file descriptor" % f) - @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON (AssertionError: didn't raise an OSError with a bad file descriptor)") + def check_bool(self, f, *args, **kwargs): + with warnings.catch_warnings(): + warnings.simplefilter("error", RuntimeWarning) + for fd in False, True: + with self.assertRaises(RuntimeWarning): + f(fd, *args, **kwargs) + + # TODO: RUSTPYTHON + @unittest.expectedFailure def test_fdopen(self): self.check(os.fdopen, encoding="utf-8") + self.check_bool(os.fdopen, encoding="utf-8") @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()') def test_isatty(self): @@ -2376,26 +2448,35 @@ def test_fchmod(self): def test_fchown(self): self.check(os.fchown, -1, -1) + # TODO: RUSTPYTHON + @unittest.expectedFailure @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()') @unittest.skipIf( support.is_emscripten or support.is_wasi, "musl libc issue on Emscripten/WASI, bpo-46390" ) def test_fpathconf(self): + self.assertIn("PC_NAME_MAX", os.pathconf_names) self.check(os.pathconf, "PC_NAME_MAX") self.check(os.fpathconf, "PC_NAME_MAX") + self.check_bool(os.pathconf, "PC_NAME_MAX") + self.check_bool(os.fpathconf, "PC_NAME_MAX") - @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON (AssertionError: didn't raise an OSError with a bad file descriptor)") + # TODO: RUSTPYTHON + @unittest.expectedFailure @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()') def test_ftruncate(self): self.check(os.truncate, 0) self.check(os.ftruncate, 0) + self.check_bool(os.truncate, 0) + # TODO: RUSTPYTHON @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON (OSError: [Errno 18] There are no more files.)") @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()') def test_lseek(self): self.check(os.lseek, 0, 0) + # TODO: RUSTPYTHON @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON (OSError: [Errno 18] There are no more files.)") @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()') def test_read(self): @@ -2410,6 +2491,7 @@ def test_readv(self): def test_tcsetpgrpt(self): self.check(os.tcsetpgrp, 0) + # TODO: RUSTPYTHON @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON (OSError: [Errno 18] There are no more files.)") @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()') def test_write(self): @@ -2419,6 +2501,7 @@ def test_write(self): def test_writev(self): self.check(os.writev, [b'abc']) + # TODO: RUSTPYTHON @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON os.get_inheritable not implemented yet for all platforms") @support.requires_subprocess() def test_inheritable(self): @@ -2431,6 +2514,16 @@ def test_blocking(self): self.check(os.get_blocking) self.check(os.set_blocking, True) + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_fchdir(self): + return super().test_fchdir() + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_fsync(self): + return super().test_fsync() + @unittest.skipUnless(hasattr(os, 'link'), 'requires os.link') class LinkTests(unittest.TestCase): @@ -2470,6 +2563,7 @@ def test_unicode_name(self): self.file2 = self.file1 + "2" self._test_link(self.file1, self.file2) + @unittest.skipIf(sys.platform == "win32", "Posix specific tests") class PosixUidGidTests(unittest.TestCase): # uid_t and gid_t are 32-bit unsigned integers on Linux @@ -2590,8 +2684,10 @@ def test_listdir(self): # test listdir without arguments current_directory = os.getcwd() try: - os.chdir(os.sep) - self.assertEqual(set(os.listdir()), set(os.listdir(os.sep))) + # The root directory is not readable on Android, so use a directory + # we created ourselves. + os.chdir(self.dir) + self.assertEqual(set(os.listdir()), expected) finally: os.chdir(current_directory) @@ -2669,11 +2765,13 @@ def _kill(self, sig): os.kill(proc.pid, sig) self.assertEqual(proc.wait(), sig) + # TODO: RUSTPYTHON @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON (ModuleNotFoundError: No module named '_ctypes')") def test_kill_sigterm(self): # SIGTERM doesn't mean anything special, but make sure it works self._kill(signal.SIGTERM) + # TODO: RUSTPYTHON @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON (ModuleNotFoundError: No module named '_ctypes')") def test_kill_int(self): # os.kill on Windows can take an int which gets set as the exit code @@ -3088,6 +3186,7 @@ def tearDown(self): if os.path.lexists(self.junction): os.unlink(self.junction) + # TODO: RUSTPYTHON @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON (AttributeError: module '_winapi' has no attribute 'CreateJunction')") def test_create_junction(self): _winapi.CreateJunction(self.junction_target, self.junction) @@ -3102,6 +3201,7 @@ def test_create_junction(self): self.assertEqual(os.path.normcase("\\\\?\\" + self.junction_target), os.path.normcase(os.readlink(self.junction))) + # TODO: RUSTPYTHON @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON (AttributeError: module '_winapi' has no attribute 'CreateJunction')") def test_unlink_removes_junction(self): _winapi.CreateJunction(self.junction_target, self.junction) @@ -3116,7 +3216,8 @@ class Win32NtTests(unittest.TestCase): def test_getfinalpathname_handles(self): nt = import_helper.import_module('nt') ctypes = import_helper.import_module('ctypes') - import ctypes.wintypes + # Ruff false positive -- it thinks we're redefining `ctypes` here + import ctypes.wintypes # noqa: F811 kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True) kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE @@ -3161,6 +3262,7 @@ def test_getfinalpathname_handles(self): self.assertEqual(0, handle_delta) + # TODO: RUSTPYTHON @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON os.stat (PermissionError: [Errno 5] Access is denied.)") @support.requires_subprocess() def test_stat_unlink_race(self): @@ -3229,9 +3331,8 @@ def test_stat_inaccessible_file(self): self.skipTest("Unable to create inaccessible file") def cleanup(): - # Give delete permission. We are the file owner, so we can do this - # even though we removed all permissions earlier. - subprocess.check_output([ICACLS, filename, "/grant", "Everyone:(D)"], + # Give delete permission to the owner (us) + subprocess.check_output([ICACLS, filename, "/grant", "*WD:(D)"], stderr=subprocess.STDOUT) os.unlink(filename) @@ -3372,6 +3473,7 @@ def test_waitstatus_to_exitcode(self): with self.assertRaises(TypeError): os.waitstatus_to_exitcode(0.0) + # TODO: RUSTPYTHON @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON os.spawnv not implemented yet for all platforms") @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test') def test_waitpid_windows(self): @@ -3381,6 +3483,7 @@ def test_waitpid_windows(self): code = f'import _winapi; _winapi.ExitProcess({STATUS_CONTROL_C_EXIT})' self.check_waitpid(code, exitcode=STATUS_CONTROL_C_EXIT) + # TODO: RUSTPYTHON @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON (OverflowError: Python int too large to convert to Rust i32)") @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test') def test_waitstatus_to_exitcode_windows(self): @@ -3506,7 +3609,7 @@ def test_nowait(self): support.wait_process(pid, exitcode=self.exitcode) # TODO: RUSTPYTHON fix spawnv bytes - @unittest.expectedFailure + @unittest.expectedFailure @requires_os_func('spawnve') def test_spawnve_bytes(self): # Test bytes handling in parse_arglist and parse_envlist (#28114) @@ -3637,9 +3740,8 @@ def test_set_get_priority(self): class TestSendfile(unittest.IsolatedAsyncioTestCase): DATA = b"12345abcde" * 16 * 1024 # 160 KiB - SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \ - not sys.platform.startswith("solaris") and \ - not sys.platform.startswith("sunos") + SUPPORT_HEADERS_TRAILERS = ( + not sys.platform.startswith(("linux", "android", "solaris", "sunos"))) requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS, 'requires headers and trailers support') requires_32b = unittest.skipUnless(sys.maxsize < 2**32, @@ -3912,10 +4014,10 @@ def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwa xattr.remove("user.test") self.assertEqual(set(listxattr(fn)), xattr) self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo") - setxattr(fn, s("user.test"), b"a"*1024, **kwargs) - self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024) + setxattr(fn, s("user.test"), b"a"*256, **kwargs) + self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*256) removexattr(fn, s("user.test"), **kwargs) - many = sorted("user.test{}".format(i) for i in range(100)) + many = sorted("user.test{}".format(i) for i in range(32)) for thing in many: setxattr(fn, thing, b"x", **kwargs) self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many)) @@ -3962,7 +4064,12 @@ def test_does_not_crash(self): try: size = os.get_terminal_size() except OSError as e: - if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY): + known_errnos = [errno.EINVAL, errno.ENOTTY] + if sys.platform == "android": + # The Android testbed redirects the native stdout to a pipe, + # which returns a different error code. + known_errnos.append(errno.EACCES) + if sys.platform == "win32" or e.errno in known_errnos: # Under win32 a generic OSError can be thrown if the # handle cannot be retrieved self.skipTest("failed to query terminal size") @@ -3971,6 +4078,7 @@ def test_does_not_crash(self): self.assertGreaterEqual(size.columns, 0) self.assertGreaterEqual(size.lines, 0) + @support.requires_subprocess() def test_stty_match(self): """Check if stty returns the same results @@ -4112,6 +4220,341 @@ def test_eventfd_select(self): self.assertEqual((rfd, wfd, xfd), ([fd], [], [])) os.eventfd_read(fd) +@unittest.skipUnless(hasattr(os, 'timerfd_create'), 'requires os.timerfd_create') +@unittest.skipIf(sys.platform == "android", "gh-124873: Test is flaky on Android") +@support.requires_linux_version(2, 6, 30) +class TimerfdTests(unittest.TestCase): + # 1 ms accuracy is reliably achievable on every platform except Android + # emulators, where we allow 10 ms (gh-108277). + + # XXX: RUSTPYTHON; AttributeError: module 'platform' has no attribute 'android_ver' + #if sys.platform == "android" and platform.android_ver().is_emulator: + if sys.platform == "android": + CLOCK_RES_PLACES = 2 + else: + CLOCK_RES_PLACES = 3 + + CLOCK_RES = 10 ** -CLOCK_RES_PLACES + CLOCK_RES_NS = 10 ** (9 - CLOCK_RES_PLACES) + + def timerfd_create(self, *args, **kwargs): + fd = os.timerfd_create(*args, **kwargs) + self.assertGreaterEqual(fd, 0) + self.assertFalse(os.get_inheritable(fd)) + self.addCleanup(os.close, fd) + return fd + + def read_count_signaled(self, fd): + # read 8 bytes + data = os.read(fd, 8) + return int.from_bytes(data, byteorder=sys.byteorder) + + def test_timerfd_initval(self): + fd = self.timerfd_create(time.CLOCK_REALTIME) + + initial_expiration = 0.25 + interval = 0.125 + + # 1st call + next_expiration, interval2 = os.timerfd_settime(fd, initial=initial_expiration, interval=interval) + self.assertAlmostEqual(interval2, 0.0, places=self.CLOCK_RES_PLACES) + self.assertAlmostEqual(next_expiration, 0.0, places=self.CLOCK_RES_PLACES) + + # 2nd call + next_expiration, interval2 = os.timerfd_settime(fd, initial=initial_expiration, interval=interval) + self.assertAlmostEqual(interval2, interval, places=self.CLOCK_RES_PLACES) + self.assertAlmostEqual(next_expiration, initial_expiration, places=self.CLOCK_RES_PLACES) + + # timerfd_gettime + next_expiration, interval2 = os.timerfd_gettime(fd) + self.assertAlmostEqual(interval2, interval, places=self.CLOCK_RES_PLACES) + self.assertAlmostEqual(next_expiration, initial_expiration, places=self.CLOCK_RES_PLACES) + + def test_timerfd_non_blocking(self): + fd = self.timerfd_create(time.CLOCK_REALTIME, flags=os.TFD_NONBLOCK) + + # 0.1 second later + initial_expiration = 0.1 + os.timerfd_settime(fd, initial=initial_expiration, interval=0) + + # read() raises OSError with errno is EAGAIN for non-blocking timer. + with self.assertRaises(OSError) as ctx: + self.read_count_signaled(fd) + self.assertEqual(ctx.exception.errno, errno.EAGAIN) + + # Wait more than 0.1 seconds + time.sleep(initial_expiration + 0.1) + + # confirm if timerfd is readable and read() returns 1 as bytes. + self.assertEqual(self.read_count_signaled(fd), 1) + + @unittest.skipIf(sys.platform.startswith('netbsd'), + "gh-131263: Skip on NetBSD due to system freeze " + "with negative timer values") + def test_timerfd_negative(self): + one_sec_in_nsec = 10**9 + fd = self.timerfd_create(time.CLOCK_REALTIME) + + test_flags = [0, os.TFD_TIMER_ABSTIME] + if hasattr(os, 'TFD_TIMER_CANCEL_ON_SET'): + test_flags.append(os.TFD_TIMER_ABSTIME | os.TFD_TIMER_CANCEL_ON_SET) + + # Any of 'initial' and 'interval' is negative value. + for initial, interval in ( (-1, 0), (1, -1), (-1, -1), (-0.1, 0), (1, -0.1), (-0.1, -0.1)): + for flags in test_flags: + with self.subTest(flags=flags, initial=initial, interval=interval): + with self.assertRaises(OSError) as context: + os.timerfd_settime(fd, flags=flags, initial=initial, interval=interval) + self.assertEqual(context.exception.errno, errno.EINVAL) + + with self.assertRaises(OSError) as context: + initial_ns = int( one_sec_in_nsec * initial ) + interval_ns = int( one_sec_in_nsec * interval ) + os.timerfd_settime_ns(fd, flags=flags, initial=initial_ns, interval=interval_ns) + self.assertEqual(context.exception.errno, errno.EINVAL) + + def test_timerfd_interval(self): + fd = self.timerfd_create(time.CLOCK_REALTIME) + + # 1 second + initial_expiration = 1 + # 0.5 second + interval = 0.5 + + os.timerfd_settime(fd, initial=initial_expiration, interval=interval) + + # timerfd_gettime + next_expiration, interval2 = os.timerfd_gettime(fd) + self.assertAlmostEqual(interval2, interval, places=self.CLOCK_RES_PLACES) + self.assertAlmostEqual(next_expiration, initial_expiration, places=self.CLOCK_RES_PLACES) + + count = 3 + t = time.perf_counter() + for _ in range(count): + self.assertEqual(self.read_count_signaled(fd), 1) + t = time.perf_counter() - t + + total_time = initial_expiration + interval * (count - 1) + self.assertGreater(t, total_time - self.CLOCK_RES) + + # wait 3.5 time of interval + time.sleep( (count+0.5) * interval) + self.assertEqual(self.read_count_signaled(fd), count) + + def test_timerfd_TFD_TIMER_ABSTIME(self): + fd = self.timerfd_create(time.CLOCK_REALTIME) + + now = time.clock_gettime(time.CLOCK_REALTIME) + + # 1 second later from now. + offset = 1 + initial_expiration = now + offset + # not interval timer + interval = 0 + + os.timerfd_settime(fd, flags=os.TFD_TIMER_ABSTIME, initial=initial_expiration, interval=interval) + + # timerfd_gettime + # Note: timerfd_gettime returns relative values even if TFD_TIMER_ABSTIME is specified. + next_expiration, interval2 = os.timerfd_gettime(fd) + self.assertAlmostEqual(interval2, interval, places=self.CLOCK_RES_PLACES) + self.assertAlmostEqual(next_expiration, offset, places=self.CLOCK_RES_PLACES) + + t = time.perf_counter() + count_signaled = self.read_count_signaled(fd) + t = time.perf_counter() - t + self.assertEqual(count_signaled, 1) + + self.assertGreater(t, offset - self.CLOCK_RES) + + def test_timerfd_select(self): + fd = self.timerfd_create(time.CLOCK_REALTIME, flags=os.TFD_NONBLOCK) + + rfd, wfd, xfd = select.select([fd], [fd], [fd], 0) + self.assertEqual((rfd, wfd, xfd), ([], [], [])) + + # 0.25 second + initial_expiration = 0.25 + # every 0.125 second + interval = 0.125 + + os.timerfd_settime(fd, initial=initial_expiration, interval=interval) + + count = 3 + t = time.perf_counter() + for _ in range(count): + rfd, wfd, xfd = select.select([fd], [fd], [fd], initial_expiration + interval) + self.assertEqual((rfd, wfd, xfd), ([fd], [], [])) + self.assertEqual(self.read_count_signaled(fd), 1) + t = time.perf_counter() - t + + total_time = initial_expiration + interval * (count - 1) + self.assertGreater(t, total_time - self.CLOCK_RES) + + def check_timerfd_poll(self, nanoseconds): + fd = self.timerfd_create(time.CLOCK_REALTIME, flags=os.TFD_NONBLOCK) + + selector = selectors.DefaultSelector() + selector.register(fd, selectors.EVENT_READ) + self.addCleanup(selector.close) + + sec_to_nsec = 10 ** 9 + # 0.25 second + initial_expiration_ns = sec_to_nsec // 4 + # every 0.125 second + interval_ns = sec_to_nsec // 8 + + if nanoseconds: + os.timerfd_settime_ns(fd, + initial=initial_expiration_ns, + interval=interval_ns) + else: + os.timerfd_settime(fd, + initial=initial_expiration_ns / sec_to_nsec, + interval=interval_ns / sec_to_nsec) + + count = 3 + if nanoseconds: + t = time.perf_counter_ns() + else: + t = time.perf_counter() + for i in range(count): + timeout_margin_ns = interval_ns + if i == 0: + timeout_ns = initial_expiration_ns + interval_ns + timeout_margin_ns + else: + timeout_ns = interval_ns + timeout_margin_ns + + ready = selector.select(timeout_ns / sec_to_nsec) + self.assertEqual(len(ready), 1, ready) + event = ready[0][1] + self.assertEqual(event, selectors.EVENT_READ) + + self.assertEqual(self.read_count_signaled(fd), 1) + + total_time = initial_expiration_ns + interval_ns * (count - 1) + if nanoseconds: + dt = time.perf_counter_ns() - t + self.assertGreater(dt, total_time - self.CLOCK_RES_NS) + else: + dt = time.perf_counter() - t + self.assertGreater(dt, total_time / sec_to_nsec - self.CLOCK_RES) + selector.unregister(fd) + + def test_timerfd_poll(self): + self.check_timerfd_poll(False) + + def test_timerfd_ns_poll(self): + self.check_timerfd_poll(True) + + def test_timerfd_ns_initval(self): + one_sec_in_nsec = 10**9 + limit_error = one_sec_in_nsec // 10**3 + fd = self.timerfd_create(time.CLOCK_REALTIME) + + # 1st call + initial_expiration_ns = 0 + interval_ns = one_sec_in_nsec // 1000 + next_expiration_ns, interval_ns2 = os.timerfd_settime_ns(fd, initial=initial_expiration_ns, interval=interval_ns) + self.assertEqual(interval_ns2, 0) + self.assertEqual(next_expiration_ns, 0) + + # 2nd call + next_expiration_ns, interval_ns2 = os.timerfd_settime_ns(fd, initial=initial_expiration_ns, interval=interval_ns) + self.assertEqual(interval_ns2, interval_ns) + self.assertEqual(next_expiration_ns, initial_expiration_ns) + + # timerfd_gettime + next_expiration_ns, interval_ns2 = os.timerfd_gettime_ns(fd) + self.assertEqual(interval_ns2, interval_ns) + self.assertLessEqual(next_expiration_ns, initial_expiration_ns) + + self.assertAlmostEqual(next_expiration_ns, initial_expiration_ns, delta=limit_error) + + def test_timerfd_ns_interval(self): + one_sec_in_nsec = 10**9 + limit_error = one_sec_in_nsec // 10**3 + fd = self.timerfd_create(time.CLOCK_REALTIME) + + # 1 second + initial_expiration_ns = one_sec_in_nsec + # every 0.5 second + interval_ns = one_sec_in_nsec // 2 + + os.timerfd_settime_ns(fd, initial=initial_expiration_ns, interval=interval_ns) + + # timerfd_gettime + next_expiration_ns, interval_ns2 = os.timerfd_gettime_ns(fd) + self.assertEqual(interval_ns2, interval_ns) + self.assertLessEqual(next_expiration_ns, initial_expiration_ns) + + count = 3 + t = time.perf_counter_ns() + for _ in range(count): + self.assertEqual(self.read_count_signaled(fd), 1) + t = time.perf_counter_ns() - t + + total_time_ns = initial_expiration_ns + interval_ns * (count - 1) + self.assertGreater(t, total_time_ns - self.CLOCK_RES_NS) + + # wait 3.5 time of interval + time.sleep( (count+0.5) * interval_ns / one_sec_in_nsec) + self.assertEqual(self.read_count_signaled(fd), count) + + + def test_timerfd_ns_TFD_TIMER_ABSTIME(self): + one_sec_in_nsec = 10**9 + limit_error = one_sec_in_nsec // 10**3 + fd = self.timerfd_create(time.CLOCK_REALTIME) + + now_ns = time.clock_gettime_ns(time.CLOCK_REALTIME) + + # 1 second later from now. + offset_ns = one_sec_in_nsec + initial_expiration_ns = now_ns + offset_ns + # not interval timer + interval_ns = 0 + + os.timerfd_settime_ns(fd, flags=os.TFD_TIMER_ABSTIME, initial=initial_expiration_ns, interval=interval_ns) + + # timerfd_gettime + # Note: timerfd_gettime returns relative values even if TFD_TIMER_ABSTIME is specified. + next_expiration_ns, interval_ns2 = os.timerfd_gettime_ns(fd) + self.assertLess(abs(interval_ns2 - interval_ns), limit_error) + self.assertLess(abs(next_expiration_ns - offset_ns), limit_error) + + t = time.perf_counter_ns() + count_signaled = self.read_count_signaled(fd) + t = time.perf_counter_ns() - t + self.assertEqual(count_signaled, 1) + + self.assertGreater(t, offset_ns - self.CLOCK_RES_NS) + + def test_timerfd_ns_select(self): + one_sec_in_nsec = 10**9 + + fd = self.timerfd_create(time.CLOCK_REALTIME, flags=os.TFD_NONBLOCK) + + rfd, wfd, xfd = select.select([fd], [fd], [fd], 0) + self.assertEqual((rfd, wfd, xfd), ([], [], [])) + + # 0.25 second + initial_expiration_ns = one_sec_in_nsec // 4 + # every 0.125 second + interval_ns = one_sec_in_nsec // 8 + + os.timerfd_settime_ns(fd, initial=initial_expiration_ns, interval=interval_ns) + + count = 3 + t = time.perf_counter_ns() + for _ in range(count): + rfd, wfd, xfd = select.select([fd], [fd], [fd], (initial_expiration_ns + interval_ns) / 1e9 ) + self.assertEqual((rfd, wfd, xfd), ([fd], [], [])) + self.assertEqual(self.read_count_signaled(fd), 1) + t = time.perf_counter_ns() - t + + total_time_ns = initial_expiration_ns + interval_ns * (count - 1) + self.assertGreater(t, total_time_ns - self.CLOCK_RES_NS) class OSErrorTests(unittest.TestCase): def setUp(self): @@ -4188,18 +4631,47 @@ def test_oserror_filename(self): self.fail(f"No exception thrown by {func}") class CPUCountTests(unittest.TestCase): + def check_cpu_count(self, cpus): + if cpus is None: + self.skipTest("Could not determine the number of CPUs") + + self.assertIsInstance(cpus, int) + self.assertGreater(cpus, 0) + def test_cpu_count(self): cpus = os.cpu_count() - if cpus is not None: - self.assertIsInstance(cpus, int) - self.assertGreater(cpus, 0) - else: + self.check_cpu_count(cpus) + + def test_process_cpu_count(self): + cpus = os.process_cpu_count() + self.assertLessEqual(cpus, os.cpu_count()) + self.check_cpu_count(cpus) + + @unittest.skipUnless(hasattr(os, 'sched_setaffinity'), + "don't have sched affinity support") + def test_process_cpu_count_affinity(self): + affinity1 = os.process_cpu_count() + if affinity1 is None: self.skipTest("Could not determine the number of CPUs") + # Disable one CPU + mask = os.sched_getaffinity(0) + if len(mask) <= 1: + self.skipTest(f"sched_getaffinity() returns less than " + f"2 CPUs: {sorted(mask)}") + self.addCleanup(os.sched_setaffinity, 0, list(mask)) + mask.pop() + os.sched_setaffinity(0, mask) + + # test process_cpu_count() + affinity2 = os.process_cpu_count() + self.assertEqual(affinity2, affinity1 - 1) + # FD inheritance check is only useful for systems with process support. @support.requires_subprocess() class FDInheritanceTests(unittest.TestCase): + # TODO: RUSTPYTHON @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON os.get_inheritable not implemented yet for all platforms") def test_get_set_inheritable(self): fd = os.open(__file__, os.O_RDONLY) @@ -4245,6 +4717,7 @@ def test_get_set_inheritable_o_path(self): os.set_inheritable(fd, False) self.assertEqual(os.get_inheritable(fd), False) + # TODO: RUSTPYTHON @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON os.get_inheritable not implemented yet for all platforms") def test_get_set_inheritable_badf(self): fd = os_helper.make_bad_fd() @@ -4261,6 +4734,7 @@ def test_get_set_inheritable_badf(self): os.set_inheritable(fd, False) self.assertEqual(ctx.exception.errno, errno.EBADF) + # TODO: RUSTPYTHON @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON os.get_inheritable not implemented yet for all platforms") def test_open(self): fd = os.open(__file__, os.O_RDONLY) @@ -4275,6 +4749,7 @@ def test_pipe(self): self.assertEqual(os.get_inheritable(rfd), False) self.assertEqual(os.get_inheritable(wfd), False) + # TODO: RUSTPYTHON @unittest.skipIf(sys.platform == 'win32', "TODO: RUSTPYTHON; os.dup on windows") def test_dup(self): fd1 = os.open(__file__, os.O_RDONLY) @@ -4284,12 +4759,14 @@ def test_dup(self): self.addCleanup(os.close, fd2) self.assertEqual(os.get_inheritable(fd2), False) + # TODO: RUSTPYTHON @unittest.skipIf(sys.platform == 'win32', "TODO: RUSTPYTHON; os.dup on windows") def test_dup_standard_stream(self): fd = os.dup(1) self.addCleanup(os.close, fd) self.assertGreater(fd, 0) + # TODO: RUSTPYTHON @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON os.dup not implemented yet for all platforms") @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test') def test_dup_nul(self): @@ -4317,13 +4794,105 @@ def test_dup2(self): self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3) self.assertFalse(os.get_inheritable(fd3)) - @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()") +@unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()") +class PseudoterminalTests(unittest.TestCase): + def open_pty(self): + """Open a pty fd-pair, and schedule cleanup for it""" + main_fd, second_fd = os.openpty() + self.addCleanup(os.close, main_fd) + self.addCleanup(os.close, second_fd) + return main_fd, second_fd + def test_openpty(self): - master_fd, slave_fd = os.openpty() - self.addCleanup(os.close, master_fd) - self.addCleanup(os.close, slave_fd) - self.assertEqual(os.get_inheritable(master_fd), False) - self.assertEqual(os.get_inheritable(slave_fd), False) + main_fd, second_fd = self.open_pty() + self.assertEqual(os.get_inheritable(main_fd), False) + self.assertEqual(os.get_inheritable(second_fd), False) + + @unittest.skipUnless(hasattr(os, 'ptsname'), "need os.ptsname()") + @unittest.skipUnless(hasattr(os, 'O_RDWR'), "need os.O_RDWR") + @unittest.skipUnless(hasattr(os, 'O_NOCTTY'), "need os.O_NOCTTY") + def test_open_via_ptsname(self): + main_fd, second_fd = self.open_pty() + second_path = os.ptsname(main_fd) + reopened_second_fd = os.open(second_path, os.O_RDWR|os.O_NOCTTY) + self.addCleanup(os.close, reopened_second_fd) + os.write(reopened_second_fd, b'foo') + self.assertEqual(os.read(main_fd, 3), b'foo') + + @unittest.skipUnless(hasattr(os, 'posix_openpt'), "need os.posix_openpt()") + @unittest.skipUnless(hasattr(os, 'grantpt'), "need os.grantpt()") + @unittest.skipUnless(hasattr(os, 'unlockpt'), "need os.unlockpt()") + @unittest.skipUnless(hasattr(os, 'ptsname'), "need os.ptsname()") + @unittest.skipUnless(hasattr(os, 'O_RDWR'), "need os.O_RDWR") + @unittest.skipUnless(hasattr(os, 'O_NOCTTY'), "need os.O_NOCTTY") + def test_posix_pty_functions(self): + mother_fd = os.posix_openpt(os.O_RDWR|os.O_NOCTTY) + self.addCleanup(os.close, mother_fd) + os.grantpt(mother_fd) + os.unlockpt(mother_fd) + son_path = os.ptsname(mother_fd) + son_fd = os.open(son_path, os.O_RDWR|os.O_NOCTTY) + self.addCleanup(os.close, son_fd) + self.assertEqual(os.ptsname(mother_fd), os.ttyname(son_fd)) + + @unittest.skipUnless(hasattr(os, 'spawnl'), "need os.spawnl()") + @support.requires_subprocess() + def test_pipe_spawnl(self): + # gh-77046: On Windows, os.pipe() file descriptors must be created with + # _O_NOINHERIT to make them non-inheritable. UCRT has no public API to + # get (_osfile(fd) & _O_NOINHERIT), so use a functional test. + # + # Make sure that fd is not inherited by a child process created by + # os.spawnl(): get_osfhandle() and dup() must fail with EBADF. + + fd, fd2 = os.pipe() + self.addCleanup(os.close, fd) + self.addCleanup(os.close, fd2) + + code = textwrap.dedent(f""" + import errno + import os + import test.support + try: + import msvcrt + except ImportError: + msvcrt = None + + fd = {fd} + + with test.support.SuppressCrashReport(): + if msvcrt is not None: + try: + handle = msvcrt.get_osfhandle(fd) + except OSError as exc: + if exc.errno != errno.EBADF: + raise + # get_osfhandle(fd) failed with EBADF as expected + else: + raise Exception("get_osfhandle() must fail") + + try: + fd3 = os.dup(fd) + except OSError as exc: + if exc.errno != errno.EBADF: + raise + # os.dup(fd) failed with EBADF as expected + else: + os.close(fd3) + raise Exception("dup must fail") + """) + + filename = os_helper.TESTFN + self.addCleanup(os_helper.unlink, os_helper.TESTFN) + with open(filename, "w") as fp: + print(code, file=fp, end="") + + executable = sys.executable + cmd = [executable, filename] + if os.name == "nt" and " " in cmd[0]: + cmd[0] = f'"{cmd[0]}"' + exitcode = os.spawnl(os.P_WAIT, executable, *cmd) + self.assertEqual(exitcode, 0) class PathTConverterTests(unittest.TestCase): @@ -4519,9 +5088,10 @@ def check_entry(self, entry, name, is_dir, is_file, is_symlink): entry_lstat, os.name == 'nt') + # TODO: RUSTPYTHON @unittest.skipIf(sys.platform == "linux", "TODO: RUSTPYTHON, flaky test") def test_attributes(self): - link = hasattr(os, 'link') + link = os_helper.can_hardlink() symlink = os_helper.can_symlink() dirname = os.path.join(self.path, "dir") @@ -4619,6 +5189,7 @@ def test_fspath_protocol_bytes(self): self.assertEqual(fspath, os.path.join(os.fsencode(self.path),bytes_filename)) + # TODO: RUSTPYTHON @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON entry.is_dir() is False") def test_removed_dir(self): path = os.path.join(self.path, 'dir') @@ -4642,6 +5213,7 @@ def test_removed_dir(self): self.assertRaises(FileNotFoundError, entry.stat) self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False) + # TODO: RUSTPYTHON @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON entry.is_file() is False") def test_removed_file(self): entry = self.create_file_entry() @@ -4731,6 +5303,7 @@ def test_fd(self): st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False) self.assertEqual(entry.stat(follow_symlinks=False), st) + # TODO: RUSTPYTHON @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON (AssertionError: FileNotFoundError not raised by scandir)") @unittest.skipIf(support.is_wasi, "WASI maps '' to cwd") def test_empty_path(self): @@ -4861,6 +5434,55 @@ class A(os.PathLike): def test_pathlike_class_getitem(self): self.assertIsInstance(os.PathLike[bytes], types.GenericAlias) + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_pathlike_subclass_slots(self): + class A(os.PathLike): + __slots__ = () + def __fspath__(self): + return '' + self.assertFalse(hasattr(A(), '__dict__')) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_fspath_set_to_None(self): + class Foo: + __fspath__ = None + + class Bar: + def __fspath__(self): + return 'bar' + + class Baz(Bar): + __fspath__ = None + + good_error_msg = ( + r"expected str, bytes or os.PathLike object, not {}".format + ) + + with self.assertRaisesRegex(TypeError, good_error_msg("Foo")): + self.fspath(Foo()) + + self.assertEqual(self.fspath(Bar()), 'bar') + + with self.assertRaisesRegex(TypeError, good_error_msg("Baz")): + self.fspath(Baz()) + + with self.assertRaisesRegex(TypeError, good_error_msg("Foo")): + open(Foo()) + + with self.assertRaisesRegex(TypeError, good_error_msg("Baz")): + open(Baz()) + + other_good_error_msg = ( + r"should be string, bytes or os.PathLike, not {}".format + ) + + with self.assertRaisesRegex(TypeError, other_good_error_msg("Foo")): + os.rename(Foo(), "foooo") + + with self.assertRaisesRegex(TypeError, other_good_error_msg("Baz")): + os.rename(Baz(), "bazzz") class TimesTests(unittest.TestCase): def test_times(self): @@ -4891,12 +5513,14 @@ def test_fork(self): support.wait_process(pid, exitcode=0) """ assert_python_ok("-c", code) - assert_python_ok("-c", code, PYTHONMALLOC="malloc_debug") + if support.Py_GIL_DISABLED: + assert_python_ok("-c", code, PYTHONMALLOC="mimalloc_debug") + else: + assert_python_ok("-c", code, PYTHONMALLOC="malloc_debug") - # TODO: RUSTPYTHON; requires _testcapi - @unittest.expectedFailure - @unittest.skipUnless(sys.platform in ("linux", "darwin"), + @unittest.skipUnless(sys.platform in ("linux", "android", "darwin"), "Only Linux and macOS detect this today.") + @unittest.skipIf(_testcapi is None, "requires _testcapi") def test_fork_warns_when_non_python_thread_exists(self): code = """if 1: import os, threading, warnings diff --git a/vm/src/stdlib/posix.rs b/vm/src/stdlib/posix.rs index 3cf9050f70..e9ccf307df 100644 --- a/vm/src/stdlib/posix.rs +++ b/vm/src/stdlib/posix.rs @@ -62,11 +62,14 @@ pub mod module { #[cfg(not(any(target_os = "redox", target_os = "freebsd")))] #[pyattr] use libc::O_DSYNC; + #[pyattr] use libc::{O_CLOEXEC, O_NONBLOCK, WNOHANG}; + #[cfg(target_os = "macos")] #[pyattr] use libc::{O_EVTONLY, O_FSYNC, O_NOFOLLOW_ANY, O_SYMLINK}; + #[cfg(not(target_os = "redox"))] #[pyattr] use libc::{O_NDELAY, O_NOCTTY}; @@ -80,34 +83,49 @@ pub mod module { #[pyattr] const EX_OK: i8 = exitcode::OK as i8; + #[pyattr] const EX_USAGE: i8 = exitcode::USAGE as i8; + #[pyattr] const EX_DATAERR: i8 = exitcode::DATAERR as i8; + #[pyattr] const EX_NOINPUT: i8 = exitcode::NOINPUT as i8; + #[pyattr] const EX_NOUSER: i8 = exitcode::NOUSER as i8; + #[pyattr] const EX_NOHOST: i8 = exitcode::NOHOST as i8; + #[pyattr] const EX_UNAVAILABLE: i8 = exitcode::UNAVAILABLE as i8; + #[pyattr] const EX_SOFTWARE: i8 = exitcode::SOFTWARE as i8; + #[pyattr] const EX_OSERR: i8 = exitcode::OSERR as i8; + #[pyattr] const EX_OSFILE: i8 = exitcode::OSFILE as i8; + #[pyattr] const EX_CANTCREAT: i8 = exitcode::CANTCREAT as i8; + #[pyattr] const EX_IOERR: i8 = exitcode::IOERR as i8; + #[pyattr] const EX_TEMPFAIL: i8 = exitcode::TEMPFAIL as i8; + #[pyattr] const EX_PROTOCOL: i8 = exitcode::PROTOCOL as i8; + #[pyattr] const EX_NOPERM: i8 = exitcode::NOPERM as i8; + #[pyattr] const EX_CONFIG: i8 = exitcode::CONFIG as i8; @@ -122,6 +140,7 @@ pub mod module { ))] #[pyattr] const SCHED_RR: i32 = libc::SCHED_RR; + #[cfg(any( target_os = "macos", target_os = "linux", @@ -133,6 +152,7 @@ pub mod module { ))] #[pyattr] const SCHED_FIFO: i32 = libc::SCHED_FIFO; + #[cfg(any( target_os = "macos", target_os = "linux", @@ -143,9 +163,11 @@ pub mod module { ))] #[pyattr] const SCHED_OTHER: i32 = libc::SCHED_OTHER; + #[cfg(any(target_os = "linux", target_os = "android"))] #[pyattr] const SCHED_IDLE: i32 = libc::SCHED_IDLE; + #[cfg(any(target_os = "linux", target_os = "android"))] #[pyattr] const SCHED_BATCH: i32 = libc::SCHED_BATCH; @@ -153,9 +175,11 @@ pub mod module { #[cfg(any(target_os = "linux", target_os = "freebsd", target_os = "macos"))] #[pyattr] const POSIX_SPAWN_OPEN: i32 = PosixSpawnFileActionIdentifier::Open as i32; + #[cfg(any(target_os = "linux", target_os = "freebsd", target_os = "macos"))] #[pyattr] const POSIX_SPAWN_CLOSE: i32 = PosixSpawnFileActionIdentifier::Close as i32; + #[cfg(any(target_os = "linux", target_os = "freebsd", target_os = "macos"))] #[pyattr] const POSIX_SPAWN_DUP2: i32 = PosixSpawnFileActionIdentifier::Dup2 as i32; @@ -1099,12 +1123,13 @@ pub mod module { vm.ctx.new_int(euid).into() } + #[cfg(not(any(target_os = "wasi", target_os = "android")))] #[pyfunction] fn setgid(gid: Gid, vm: &VirtualMachine) -> PyResult<()> { unistd::setgid(gid).map_err(|err| err.into_pyexception(vm)) } - #[cfg(not(target_os = "redox"))] + #[cfg(not(any(target_os = "wasi", target_os = "android", target_os = "redox")))] #[pyfunction] fn setegid(egid: Gid, vm: &VirtualMachine) -> PyResult<()> { unistd::setegid(egid).map_err(|err| err.into_pyexception(vm)) @@ -1116,7 +1141,7 @@ pub mod module { .map_err(|err| err.into_pyexception(vm)) } - #[cfg(not(target_os = "redox"))] + #[cfg(not(any(target_os = "wasi", target_os = "redox")))] #[pyfunction] fn setsid(vm: &VirtualMachine) -> PyResult<()> { unistd::setsid() @@ -1162,18 +1187,19 @@ pub mod module { } } + #[cfg(not(any(target_os = "wasi", target_os = "android")))] #[pyfunction] fn setuid(uid: Uid) -> nix::Result<()> { unistd::setuid(uid) } - #[cfg(not(target_os = "redox"))] + #[cfg(not(any(target_os = "wasi", target_os = "android", target_os = "redox")))] #[pyfunction] fn seteuid(euid: Uid) -> nix::Result<()> { unistd::seteuid(euid) } - #[cfg(not(target_os = "redox"))] + #[cfg(not(any(target_os = "wasi", target_os = "android", target_os = "redox")))] #[pyfunction] fn setreuid(ruid: Uid, euid: Uid) -> nix::Result<()> { let ret = unsafe { libc::setreuid(ruid.as_raw(), euid.as_raw()) }; @@ -1271,7 +1297,7 @@ pub mod module { unistd::setresgid(rgid, egid, sgid).map_err(|err| err.into_pyexception(vm)) } - #[cfg(not(target_os = "redox"))] + #[cfg(not(any(target_os = "wasi", target_os = "android", target_os = "redox")))] #[pyfunction] fn setregid(rgid: Gid, egid: Gid) -> nix::Result<()> { let ret = unsafe { libc::setregid(rgid.as_raw(), egid.as_raw()) }; @@ -1602,9 +1628,14 @@ pub mod module { args.spawn(true, vm) } - #[pyfunction(name = "WIFSIGNALED")] - fn wifsignaled(status: i32) -> bool { - libc::WIFSIGNALED(status) + #[pyfunction(name = "WCOREDUMP")] + fn wcoredump(status: i32) -> bool { + libc::WCOREDUMP(status) + } + + #[pyfunction(name = "WIFCONTINUED")] + fn wifcontinued(status: i32) -> bool { + libc::WIFCONTINUED(status) } #[pyfunction(name = "WIFSTOPPED")] @@ -1612,14 +1643,19 @@ pub mod module { libc::WIFSTOPPED(status) } + #[pyfunction(name = "WIFSIGNALED")] + fn wifsignaled(status: i32) -> bool { + libc::WIFSIGNALED(status) + } + #[pyfunction(name = "WIFEXITED")] fn wifexited(status: i32) -> bool { libc::WIFEXITED(status) } - #[pyfunction(name = "WTERMSIG")] - fn wtermsig(status: i32) -> i32 { - libc::WTERMSIG(status) + #[pyfunction(name = "WEXITSTATUS")] + fn wexitstatus(status: i32) -> i32 { + libc::WEXITSTATUS(status) } #[pyfunction(name = "WSTOPSIG")] @@ -1627,9 +1663,9 @@ pub mod module { libc::WSTOPSIG(status) } - #[pyfunction(name = "WEXITSTATUS")] - fn wexitstatus(status: i32) -> i32 { - libc::WEXITSTATUS(status) + #[pyfunction(name = "WTERMSIG")] + fn wtermsig(status: i32) -> i32 { + libc::WTERMSIG(status) } #[pyfunction]