From 2b43d4817c9352959ce2514a7d8878dff4f98872 Mon Sep 17 00:00:00 2001 From: DimitrisJim Date: Fri, 19 May 2023 19:54:38 +0300 Subject: [PATCH 1/3] Add additional arguments to fork_exec. --- stdlib/src/posixsubprocess.rs | 34 ++++++++++++++++++++++++++-------- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/stdlib/src/posixsubprocess.rs b/stdlib/src/posixsubprocess.rs index c60cd8f3ac..5f8cb3f5b9 100644 --- a/stdlib/src/posixsubprocess.rs +++ b/stdlib/src/posixsubprocess.rs @@ -48,6 +48,7 @@ mod _posixsubprocess { macro_rules! gen_args { ($($field:ident: $t:ty),*$(,)?) => { + #[allow(dead_code)] #[derive(FromArgs)] struct ForkExecArgs { $(#[pyarg(positional)] $field: $t,)* @@ -66,14 +67,31 @@ impl TryFromObject for CStrPathLike { } gen_args! { - args: ArgSequence /* list */, exec_list: ArgSequence /* list */, - close_fds: bool, fds_to_keep: ArgSequence, - cwd: Option, env_list: Option>, - p2cread: i32, p2cwrite: i32, c2pread: i32, c2pwrite: i32, - errread: i32, errwrite: i32, errpipe_read: i32, errpipe_write: i32, - restore_signals: bool, call_setsid: bool, - gid: Option>, groups_list: Option, uid: Option>, child_umask: i32, + args: ArgSequence /* list */, + exec_list: ArgSequence /* list */, + close_fds: bool, + fds_to_keep: ArgSequence, + cwd: Option, + env_list: Option>, + p2cread: i32, + p2cwrite: i32, + c2pread: i32, + c2pwrite: i32, + errread: i32, + errwrite: i32, + errpipe_read: i32, + errpipe_write: i32, + restore_signals: bool, + call_setsid: bool, + // TODO: Difference between gid_to_set and gid_object. + // One is a `gid_t` and the other is a `PyObject` in CPython. + gid_to_set: Option>, + gid_object: PyObjectRef, + groups_list: Option, + uid: Option>, + child_umask: i32, preexec_fn: Option, + use_vfork: bool, } // can't reallocate inside of exec(), so we reallocate prior to fork() and pass this along @@ -160,7 +178,7 @@ fn exec_inner(args: &ForkExecArgs, procargs: ProcArgs) -> nix::Result { // unistd::setgroups(groups_size, groups); } - if let Some(_gid) = args.gid.as_ref() { + if let Some(_gid) = args.gid_to_set.as_ref() { // TODO: setgid // unistd::setregid(gid, gid)?; } From 7366a41b1c91ea379df99c52ebc30eb894e7bcf6 Mon Sep 17 00:00:00 2001 From: DimitrisJim Date: Fri, 19 May 2023 19:54:59 +0300 Subject: [PATCH 2/3] Update subprocess from CPython 3.11 --- Lib/subprocess.py | 143 ++++++++++++++++++----------- Lib/test/test_subprocess.py | 177 ++++++++++++++++-------------------- 2 files changed, 171 insertions(+), 149 deletions(-) diff --git a/Lib/subprocess.py b/Lib/subprocess.py index 2680a73603..9cadd1bf8e 100644 --- a/Lib/subprocess.py +++ b/Lib/subprocess.py @@ -43,6 +43,7 @@ import builtins import errno import io +import locale import os import time import signal @@ -65,16 +66,19 @@ # NOTE: We intentionally exclude list2cmdline as it is # considered an internal implementation detail. issue10838. +# use presence of msvcrt to detect Windows-like platforms (see bpo-8110) try: import msvcrt - import _winapi - _mswindows = True except ModuleNotFoundError: _mswindows = False - import _posixsubprocess - import select - import selectors else: + _mswindows = True + +# wasm32-emscripten and wasm32-wasi do not support processes +_can_fork_exec = sys.platform not in {"emscripten", "wasi"} + +if _mswindows: + import _winapi from _winapi import (CREATE_NEW_CONSOLE, CREATE_NEW_PROCESS_GROUP, STD_INPUT_HANDLE, STD_OUTPUT_HANDLE, STD_ERROR_HANDLE, SW_HIDE, @@ -95,6 +99,24 @@ "NORMAL_PRIORITY_CLASS", "REALTIME_PRIORITY_CLASS", "CREATE_NO_WINDOW", "DETACHED_PROCESS", "CREATE_DEFAULT_ERROR_MODE", "CREATE_BREAKAWAY_FROM_JOB"]) +else: + if _can_fork_exec: + from _posixsubprocess import fork_exec as _fork_exec + # used in methods that are called by __del__ + _waitpid = os.waitpid + _waitstatus_to_exitcode = os.waitstatus_to_exitcode + _WIFSTOPPED = os.WIFSTOPPED + _WSTOPSIG = os.WSTOPSIG + _WNOHANG = os.WNOHANG + else: + _fork_exec = None + _waitpid = None + _waitstatus_to_exitcode = None + _WIFSTOPPED = None + _WSTOPSIG = None + _WNOHANG = None + import select + import selectors # Exception classes used by this module. @@ -207,8 +229,7 @@ def Detach(self): def __repr__(self): return "%s(%d)" % (self.__class__.__name__, int(self)) - # XXX: RustPython; OSError('The handle is invalid. (os error 6)') - # __del__ = Close + __del__ = Close else: # When select or poll has indicated that the file is writable, # we can write up to _PIPE_BUF bytes without risk of blocking. @@ -303,12 +324,14 @@ def _args_from_interpreter_flags(): args.append('-E') if sys.flags.no_user_site: args.append('-s') + if sys.flags.safe_path: + args.append('-P') # -W options warnopts = sys.warnoptions[:] - bytes_warning = sys.flags.bytes_warning xoptions = getattr(sys, '_xoptions', {}) - dev_mode = ('dev' in xoptions) + bytes_warning = sys.flags.bytes_warning + dev_mode = sys.flags.dev_mode if bytes_warning > 1: warnopts.remove("error::BytesWarning") @@ -335,6 +358,26 @@ def _args_from_interpreter_flags(): return args +def _text_encoding(): + # Return default text encoding and emit EncodingWarning if + # sys.flags.warn_default_encoding is true. + if sys.flags.warn_default_encoding: + f = sys._getframe() + filename = f.f_code.co_filename + stacklevel = 2 + while f := f.f_back: + if f.f_code.co_filename != filename: + break + stacklevel += 1 + warnings.warn("'encoding' argument not specified.", + EncodingWarning, stacklevel) + + if sys.flags.utf8_mode: + return "utf-8" + else: + return locale.getencoding() + + def call(*popenargs, timeout=None, **kwargs): """Run command with arguments. Wait for command to complete or timeout, then return the returncode attribute. @@ -406,13 +449,15 @@ def check_output(*popenargs, timeout=None, **kwargs): decoded according to locale encoding, or by "encoding" if set. Text mode is triggered by setting any of text, encoding, errors or universal_newlines. """ - if 'stdout' in kwargs: - raise ValueError('stdout argument not allowed, it will be overridden.') + for kw in ('stdout', 'check'): + if kw in kwargs: + raise ValueError(f'{kw} argument not allowed, it will be overridden.') if 'input' in kwargs and kwargs['input'] is None: # Explicitly passing input=None was previously equivalent to passing an # empty string. That is maintained here for backwards compatibility. - if kwargs.get('universal_newlines') or kwargs.get('text'): + if kwargs.get('universal_newlines') or kwargs.get('text') or kwargs.get('encoding') \ + or kwargs.get('errors'): empty = '' else: empty = b'' @@ -464,7 +509,8 @@ def run(*popenargs, The returned instance will have attributes args, returncode, stdout and stderr. By default, stdout and stderr are not captured, and those attributes - will be None. Pass stdout=PIPE and/or stderr=PIPE in order to capture them. + will be None. Pass stdout=PIPE and/or stderr=PIPE in order to capture them, + or pass capture_output=True to capture both. If check is True and the exit code was non-zero, it raises a CalledProcessError. The CalledProcessError object will have the return code @@ -600,7 +646,7 @@ def list2cmdline(seq): # Various tools for executing commands and looking at their output and status. # -def getstatusoutput(cmd): +def getstatusoutput(cmd, *, encoding=None, errors=None): """Return (exitcode, output) of executing cmd in a shell. Execute the string 'cmd' in a shell with 'check_output' and @@ -622,7 +668,8 @@ def getstatusoutput(cmd): (-15, '') """ try: - data = check_output(cmd, shell=True, text=True, stderr=STDOUT) + data = check_output(cmd, shell=True, text=True, stderr=STDOUT, + encoding=encoding, errors=errors) exitcode = 0 except CalledProcessError as ex: data = ex.output @@ -631,7 +678,7 @@ def getstatusoutput(cmd): data = data[:-1] return exitcode, data -def getoutput(cmd): +def getoutput(cmd, *, encoding=None, errors=None): """Return output (stdout or stderr) of executing cmd in a shell. Like getstatusoutput(), except the exit status is ignored and the return @@ -641,7 +688,8 @@ def getoutput(cmd): >>> subprocess.getoutput('ls /bin/ls') '/bin/ls' """ - return getstatusoutput(cmd)[1] + return getstatusoutput(cmd, encoding=encoding, errors=errors)[1] + def _use_posix_spawn(): @@ -736,6 +784,8 @@ class Popen: start_new_session (POSIX only) + process_group (POSIX only) + group (POSIX only) extra_groups (POSIX only) @@ -761,8 +811,14 @@ def __init__(self, args, bufsize=-1, executable=None, startupinfo=None, creationflags=0, restore_signals=True, start_new_session=False, pass_fds=(), *, user=None, group=None, extra_groups=None, - encoding=None, errors=None, text=None, umask=-1, pipesize=-1): + encoding=None, errors=None, text=None, umask=-1, pipesize=-1, + process_group=None): """Create new Popen instance.""" + if not _can_fork_exec: + raise OSError( + errno.ENOTSUP, f"{sys.platform} does not support processes." + ) + _cleanup() # Held while anything is calling waitpid before returncode has been # updated to prevent clobbering returncode if wait() or poll() are @@ -848,15 +904,8 @@ def __init__(self, args, bufsize=-1, executable=None, errread = msvcrt.open_osfhandle(errread.Detach(), 0) self.text_mode = encoding or errors or text or universal_newlines - - # PEP 597: We suppress the EncodingWarning in subprocess module - # for now (at Python 3.10), because we focus on files for now. - # This will be changed to encoding = io.text_encoding(encoding) - # in the future. if self.text_mode and encoding is None: - # TODO: RUSTPYTHON; encoding `locale` is not supported yet - pass - # self.encoding = encoding = "locale" + self.encoding = encoding = _text_encoding() # How long to resume waiting on a child after the first ^C. # There is no right value for this. The purpose is to be polite @@ -874,6 +923,9 @@ def __init__(self, args, bufsize=-1, executable=None, else: line_buffering = False + if process_group is None: + process_group = -1 # The internal APIs are int-only + gid = None if group is not None: if not hasattr(os, 'setregid'): @@ -977,7 +1029,7 @@ def __init__(self, args, bufsize=-1, executable=None, errread, errwrite, restore_signals, gid, gids, uid, umask, - start_new_session) + start_new_session, process_group) except: # Cleanup if the child failed starting. for f in filter(None, (self.stdin, self.stdout, self.stderr)): @@ -1285,11 +1337,7 @@ def _get_handles(self, stdin, stdout, stderr): else: # Assuming file-like object p2cread = msvcrt.get_osfhandle(stdin.fileno()) - # XXX RUSTPYTHON TODO: figure out why closing these old, non-inheritable - # pipe handles is necessary for us, but not CPython - old = p2cread p2cread = self._make_inheritable(p2cread) - if stdin == PIPE: _winapi.CloseHandle(old) if stdout is None: c2pwrite = _winapi.GetStdHandle(_winapi.STD_OUTPUT_HANDLE) @@ -1307,11 +1355,7 @@ def _get_handles(self, stdin, stdout, stderr): else: # Assuming file-like object c2pwrite = msvcrt.get_osfhandle(stdout.fileno()) - # XXX RUSTPYTHON TODO: figure out why closing these old, non-inheritable - # pipe handles is necessary for us, but not CPython - old = c2pwrite c2pwrite = self._make_inheritable(c2pwrite) - if stdout == PIPE: _winapi.CloseHandle(old) if stderr is None: errwrite = _winapi.GetStdHandle(_winapi.STD_ERROR_HANDLE) @@ -1331,11 +1375,7 @@ def _get_handles(self, stdin, stdout, stderr): else: # Assuming file-like object errwrite = msvcrt.get_osfhandle(stderr.fileno()) - # XXX RUSTPYTHON TODO: figure out why closing these old, non-inheritable - # pipe handles is necessary for us, but not CPython - old = errwrite errwrite = self._make_inheritable(errwrite) - if stderr == PIPE: _winapi.CloseHandle(old) return (p2cread, p2cwrite, c2pread, c2pwrite, @@ -1373,7 +1413,7 @@ def _execute_child(self, args, executable, preexec_fn, close_fds, unused_restore_signals, unused_gid, unused_gids, unused_uid, unused_umask, - unused_start_new_session): + unused_start_new_session, unused_process_group): """Execute program (MS Windows version)""" assert not pass_fds, "pass_fds not supported on Windows." @@ -1705,7 +1745,7 @@ def _execute_child(self, args, executable, preexec_fn, close_fds, errread, errwrite, restore_signals, gid, gids, uid, umask, - start_new_session): + start_new_session, process_group): """Execute program (POSIX version)""" if isinstance(args, (str, bytes)): @@ -1741,6 +1781,7 @@ def _execute_child(self, args, executable, preexec_fn, close_fds, and (c2pwrite == -1 or c2pwrite > 2) and (errwrite == -1 or errwrite > 2) and not start_new_session + and process_group == -1 and gid is None and gids is None and uid is None @@ -1790,7 +1831,7 @@ def _execute_child(self, args, executable, preexec_fn, close_fds, for dir in os.get_exec_path(env)) fds_to_keep = set(pass_fds) fds_to_keep.add(errpipe_write) - self.pid = _posixsubprocess.fork_exec( + self.pid = _fork_exec( args, executable_list, close_fds, tuple(sorted(map(int, fds_to_keep))), cwd, env_list, @@ -1798,8 +1839,8 @@ def _execute_child(self, args, executable, preexec_fn, close_fds, errread, errwrite, errpipe_read, errpipe_write, restore_signals, start_new_session, - gid, gids, uid, umask, - preexec_fn) + process_group, gid, gids, uid, umask, + preexec_fn, _USE_VFORK) self._child_created = True finally: # be sure the FD is closed no matter what @@ -1862,19 +1903,19 @@ def _execute_child(self, args, executable, preexec_fn, close_fds, def _handle_exitstatus(self, sts, - waitstatus_to_exitcode=os.waitstatus_to_exitcode, - _WIFSTOPPED=os.WIFSTOPPED, - _WSTOPSIG=os.WSTOPSIG): + _waitstatus_to_exitcode=_waitstatus_to_exitcode, + _WIFSTOPPED=_WIFSTOPPED, + _WSTOPSIG=_WSTOPSIG): """All callers to this function MUST hold self._waitpid_lock.""" # This method is called (indirectly) by __del__, so it cannot # refer to anything outside of its local scope. if _WIFSTOPPED(sts): self.returncode = -_WSTOPSIG(sts) else: - self.returncode = waitstatus_to_exitcode(sts) + self.returncode = _waitstatus_to_exitcode(sts) - def _internal_poll(self, _deadstate=None, _waitpid=os.waitpid, - _WNOHANG=os.WNOHANG, _ECHILD=errno.ECHILD): + def _internal_poll(self, _deadstate=None, _waitpid=_waitpid, + _WNOHANG=_WNOHANG, _ECHILD=errno.ECHILD): """Check if child process has terminated. Returns returncode attribute. @@ -2105,7 +2146,7 @@ def send_signal(self, sig): try: os.kill(self.pid, sig) except ProcessLookupError: - # Supress the race condition error; bpo-40550. + # Suppress the race condition error; bpo-40550. pass def terminate(self): diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py index 2d263fa865..424a4a93b6 100644 --- a/Lib/test/test_subprocess.py +++ b/Lib/test/test_subprocess.py @@ -48,6 +48,9 @@ if support.PGO: raise unittest.SkipTest("test is not helpful for PGO") +if not support.has_subprocess_support: + raise unittest.SkipTest("test module requires subprocess") + mswindows = (sys.platform == "win32") # @@ -171,6 +174,14 @@ def test_check_output(self): [sys.executable, "-c", "print('BDFL')"]) self.assertIn(b'BDFL', output) + with self.assertRaisesRegex(ValueError, + "stdout argument not allowed, it will be overridden"): + subprocess.check_output([], stdout=None) + + with self.assertRaisesRegex(ValueError, + "check argument not allowed, it will be overridden"): + subprocess.check_output([], check=False) + def test_check_output_nonzero(self): # check_call() function with non-zero return code with self.assertRaises(subprocess.CalledProcessError) as c: @@ -227,6 +238,12 @@ def test_check_output_input_none_universal_newlines(self): input=None, universal_newlines=True) self.assertNotIn('XX', output) + def test_check_output_input_none_encoding_errors(self): + output = subprocess.check_output( + [sys.executable, "-c", "print('foo')"], + input=None, encoding='utf-8', errors='ignore') + self.assertIn('foo', output) + def test_check_output_stdout_arg(self): # check_output() refuses to accept 'stdout' argument with self.assertRaises(ValueError) as c: @@ -719,6 +736,8 @@ def test_pipesizes(self): # However, this function is not yet in _winapi. p.stdin.write(b"pear") p.stdin.close() + p.stdout.close() + p.stderr.close() finally: p.kill() p.wait() @@ -746,6 +765,8 @@ def test_pipesize_default(self): # On other platforms we cannot test the pipe size (yet). But above # code using pipesize=-1 should not crash. p.stdin.close() + p.stdout.close() + p.stderr.close() finally: p.kill() p.wait() @@ -790,7 +811,6 @@ def is_env_var_to_ignore(n): if not is_env_var_to_ignore(k)] self.assertEqual(child_env_names, []) - @unittest.skipIf(sys.platform == "win32", "TODO: RUSTPYTHON, null byte is not checked") def test_invalid_cmd(self): # null character in the command name cmd = sys.executable + '\0' @@ -936,7 +956,6 @@ def test_communicate_returns(self): self.assertEqual(stdout, None) self.assertEqual(stderr, None) - @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON") def test_communicate_pipe_buf(self): # communicate() with writes larger than pipe_buf # This test will probably deadlock rather than fail, if @@ -976,8 +995,6 @@ def test_writes_before_communicate(self): self.assertEqual(stdout, b"bananasplit") self.assertEqual(stderr, b"") - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_universal_newlines_and_text(self): args = [ sys.executable, "-c", @@ -1017,7 +1034,6 @@ def test_universal_newlines_and_text(self): self.assertEqual(p.stdout.read(), "line4\nline5\nline6\nline7\nline8") - @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON") def test_universal_newlines_communicate(self): # universal newlines through communicate() p = subprocess.Popen([sys.executable, "-c", @@ -1069,7 +1085,6 @@ def test_universal_newlines_communicate_input_none(self): p.communicate() self.assertEqual(p.returncode, 0) - @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON") def test_universal_newlines_communicate_stdin_stdout_stderr(self): # universal newlines through communicate(), with stdin, stdout, stderr p = subprocess.Popen([sys.executable, "-c", @@ -1098,8 +1113,6 @@ def test_universal_newlines_communicate_stdin_stdout_stderr(self): # to stderr at exit of subprocess. self.assertTrue(stderr.startswith("eline2\neline6\neline7\n")) - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_universal_newlines_communicate_encodings(self): # Check that universal newlines mode works for various encodings, # in particular for encodings in the UTF-16 and UTF-32 families. @@ -1122,8 +1135,6 @@ def test_universal_newlines_communicate_encodings(self): stdout, stderr = popen.communicate(input='') self.assertEqual(stdout, '1\n2\n3\n4') - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_communicate_errors(self): for errors, expected in [ ('ignore', ''), @@ -1263,15 +1274,12 @@ def _test_bufsize_equal_one(self, line, expected, universal_newlines): self.assertEqual(p.returncode, 0) self.assertEqual(read_line, expected) - @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON") def test_bufsize_equal_one_text_mode(self): # line is flushed in text mode with bufsize=1. # we should get the full line in return line = "line\n" self._test_bufsize_equal_one(line, line, universal_newlines=True) - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_bufsize_equal_one_binary_mode(self): # line is not flushed in binary mode with bufsize=1. # we should get empty response @@ -1444,7 +1452,6 @@ def test_handles_closed_on_exception(self): self.assertFalse(os.path.exists(ofname)) self.assertFalse(os.path.exists(efname)) - @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON") def test_communicate_epipe(self): # Issue 10963: communicate() should hide EPIPE p = subprocess.Popen(ZERO_RETURN_CMD, @@ -1475,7 +1482,6 @@ def test_repr(self): p.returncode = code self.assertEqual(repr(p), sx) - @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON") def test_communicate_epipe_only_stdin(self): # Issue 10963: communicate() should hide EPIPE p = subprocess.Popen(ZERO_RETURN_CMD, @@ -1484,8 +1490,6 @@ def test_communicate_epipe_only_stdin(self): p.wait() p.communicate(b"x" * 2**20) - # TODO: RUSTPYTHON - @unittest.expectedFailure @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), "Requires signal.SIGUSR1") @unittest.skipUnless(hasattr(os, 'kill'), @@ -1535,8 +1539,6 @@ def test_file_not_found_includes_filename(self): subprocess.call(['/opt/nonexistent_binary', 'with', 'some', 'args']) self.assertEqual(c.exception.filename, '/opt/nonexistent_binary') - # TODO: RUSTPYTHON - @unittest.expectedFailure @unittest.skipIf(mswindows, "behavior currently not supported on Windows") def test_file_not_found_with_bad_cwd(self): with self.assertRaises(FileNotFoundError) as c: @@ -1547,6 +1549,22 @@ def test_class_getitems(self): self.assertIsInstance(subprocess.Popen[bytes], types.GenericAlias) self.assertIsInstance(subprocess.CompletedProcess[str], types.GenericAlias) + @unittest.skipIf(not sysconfig.get_config_var("HAVE_VFORK"), + "vfork() not enabled by configure.") + @mock.patch("subprocess._fork_exec") + def test__use_vfork(self, mock_fork_exec): + self.assertTrue(subprocess._USE_VFORK) # The default value regardless. + mock_fork_exec.side_effect = RuntimeError("just testing args") + with self.assertRaises(RuntimeError): + subprocess.run([sys.executable, "-c", "pass"]) + mock_fork_exec.assert_called_once() + self.assertTrue(mock_fork_exec.call_args.args[-1]) + with mock.patch.object(subprocess, '_USE_VFORK', False): + with self.assertRaises(RuntimeError): + subprocess.run([sys.executable, "-c", "pass"]) + self.assertFalse(mock_fork_exec.call_args_list[-1].args[-1]) + + class RunFuncTestCase(BaseTestCase): def run_python(self, code, **kwargs): """Run Python code in a subprocess using subprocess.run""" @@ -1721,27 +1739,18 @@ def test_run_with_shell_timeout_and_capture_output(self): msg="TimeoutExpired was delayed! Bad traceback:\n```\n" f"{stacks}```") - @unittest.skipIf(not sysconfig.get_config_var("HAVE_VFORK"), - "vfork() not enabled by configure.") - def test__use_vfork(self): - # Attempts code coverage within _posixsubprocess.c on the code that - # probes the subprocess module for the existence and value of this - # attribute in 3.10.5. - self.assertTrue(subprocess._USE_VFORK) # The default value regardless. - with mock.patch.object(subprocess, "_USE_VFORK", False): - self.assertEqual(self.run_python("pass").returncode, 0, - msg="False _USE_VFORK failed") - - class RaisingBool: - def __bool__(self): - raise RuntimeError("force PyObject_IsTrue to return -1") - - with mock.patch.object(subprocess, "_USE_VFORK", RaisingBool()): - self.assertEqual(self.run_python("pass").returncode, 0, - msg="odd bool()-error _USE_VFORK failed") - del subprocess._USE_VFORK - self.assertEqual(self.run_python("pass").returncode, 0, - msg="lack of a _USE_VFORK attribute failed") + def test_encoding_warning(self): + code = textwrap.dedent("""\ + from subprocess import * + run("echo hello", shell=True, text=True) + check_output("echo hello", shell=True, text=True) + """) + cp = subprocess.run([sys.executable, "-Xwarn_default_encoding", "-c", code], + capture_output=True) + lines = cp.stderr.splitlines() + self.assertEqual(len(lines), 2, lines) + self.assertTrue(lines[0].startswith(b":2: EncodingWarning: ")) + self.assertTrue(lines[1].startswith(b":3: EncodingWarning: ")) def _get_test_grp_name(): @@ -1776,8 +1785,6 @@ def _get_chdir_exception(self): self._nonexistent_dir) return desired_exception - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_exception_cwd(self): """Test error in the child raised in the parent for a bad cwd.""" desired_exception = self._get_chdir_exception() @@ -1793,8 +1800,6 @@ def test_exception_cwd(self): else: self.fail("Expected OSError: %s" % desired_exception) - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_exception_bad_executable(self): """Test error in the child raised in the parent for a bad executable.""" desired_exception = self._get_chdir_exception() @@ -1810,8 +1815,6 @@ def test_exception_bad_executable(self): else: self.fail("Expected OSError: %s" % desired_exception) - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_exception_bad_args_0(self): """Test error in the child raised in the parent for a bad args[0].""" desired_exception = self._get_chdir_exception() @@ -1837,7 +1840,7 @@ class PopenNoDestructor(subprocess.Popen): def __del__(self): pass - @mock.patch("subprocess._posixsubprocess.fork_exec") + @mock.patch("subprocess._fork_exec") def test_exception_errpipe_normal(self, fork_exec): """Test error passing done through errpipe_write in the good case""" def proper_error(*args): @@ -1854,7 +1857,7 @@ def proper_error(*args): with self.assertRaises(IsADirectoryError): self.PopenNoDestructor(["non_existent_command"]) - @mock.patch("subprocess._posixsubprocess.fork_exec") + @mock.patch("subprocess._fork_exec") def test_exception_errpipe_bad_data(self, fork_exec): """Test error passing done through errpipe_write where its not in the expected format""" @@ -1876,8 +1879,6 @@ def bad_error(*args): self.assertIn(repr(error_data), str(e.exception)) - # TODO: RUSTPYTHON - @unittest.expectedFailure @unittest.skipIf(not os.path.exists('/proc/self/status'), "need /proc/self/status") def test_restore_signals(self): @@ -1910,16 +1911,32 @@ def test_start_new_session(self): output = subprocess.check_output( [sys.executable, "-c", "import os; print(os.getsid(0))"], start_new_session=True) - except OSError as e: + except PermissionError as e: if e.errno != errno.EPERM: - raise + raise # EACCES? else: parent_sid = os.getsid(0) child_sid = int(output) self.assertNotEqual(parent_sid, child_sid) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.skipUnless(hasattr(os, 'setpgid') and hasattr(os, 'getpgid'), + 'no setpgid or getpgid on platform') + def test_process_group_0(self): + # For code coverage of calling setpgid(). We don't care if we get an + # EPERM error from it depending on the test execution environment, that + # still indicates that it was called. + try: + output = subprocess.check_output( + [sys.executable, "-c", "import os; print(os.getpgid(0))"], + process_group=0) + except PermissionError as e: + if e.errno != errno.EPERM: + raise # EACCES? + else: + parent_pgid = os.getpgid(0) + child_pgid = int(output) + self.assertNotEqual(parent_pgid, child_pgid) + @unittest.skipUnless(hasattr(os, 'setreuid'), 'no setreuid on platform') def test_user(self): # For code coverage of the user parameter. We don't care if we get an @@ -1977,8 +1994,6 @@ def test_user_error(self): with self.assertRaises(ValueError): subprocess.check_call(ZERO_RETURN_CMD, user=65535) - # TODO: RUSTPYTHON, observed gids do not match expected gids - @unittest.expectedFailure @unittest.skipUnless(hasattr(os, 'setregid'), 'no setregid() on platform') def test_group(self): gid = os.getegid() @@ -2026,8 +2041,6 @@ def test_group_error(self): with self.assertRaises(ValueError): subprocess.check_call(ZERO_RETURN_CMD, group=65535) - # TODO: RUSTPYTHON, observed gids do not match expected gids - @unittest.expectedFailure @unittest.skipUnless(hasattr(os, 'setgroups'), 'no setgroups() on platform') def test_extra_groups(self): gid = os.getegid() @@ -2082,8 +2095,6 @@ def test_extra_groups_error(self): with self.assertRaises(ValueError): subprocess.check_call(ZERO_RETURN_CMD, extra_groups=[]) - # TODO: RUSTPYTHON - @unittest.expectedFailure @unittest.skipIf(mswindows or not hasattr(os, 'umask'), 'POSIX umask() is not available.') def test_umask(self): @@ -2135,8 +2146,6 @@ def test_CalledProcessError_str_non_zero(self): error_string = str(err) self.assertIn("non-zero exit status 2.", error_string) - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_preexec(self): # DISCLAIMER: Setting environment variables is *not* a good use # of a preexec_fn. This is merely a test. @@ -2148,8 +2157,6 @@ def test_preexec(self): with p: self.assertEqual(p.stdout.read(), b"apple") - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_preexec_exception(self): def raise_it(): raise ValueError("What if two swallows carried a coconut?") @@ -2158,7 +2165,7 @@ def raise_it(): preexec_fn=raise_it) except subprocess.SubprocessError as e: self.assertTrue( - subprocess._posixsubprocess, + subprocess._fork_exec, "Expected a ValueError from the preexec_fn") except ValueError as e: self.assertIn("coconut", e.args[0]) @@ -2191,8 +2198,6 @@ def _execute_child(self, *args, **kwargs): for fd in devzero_fds: os.close(fd) - # TODO: RUSTPYTHON - @unittest.expectedFailure @unittest.skipIf(not os.path.exists("/dev/zero"), "/dev/zero required.") def test_preexec_errpipe_does_not_double_close_pipes(self): """Issue16140: Don't double close pipes on preexec error.""" @@ -2207,8 +2212,6 @@ def raise_it(): stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, preexec_fn=raise_it) - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_preexec_gc_module_failure(self): # This tests the code that disables garbage collection if the child # process will execute any Python. @@ -2230,8 +2233,6 @@ def test_preexec_gc_module_failure(self): if not enabled: gc.disable() - # TODO: RUSTPYTHON - @unittest.expectedFailure @unittest.skipIf( sys.platform == 'darwin', 'setrlimit() seems to fail on OS X') def test_preexec_fork_failure(self): @@ -2642,8 +2643,6 @@ def test_swap_std_fds_with_one_closed(self): for to_fds in itertools.permutations(range(3), 2): self._check_swap_std_fds_with_one_closed(from_fds, to_fds) - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_surrogates_error_message(self): def prepare(): raise ValueError("surrogate:\uDCff") @@ -2654,17 +2653,15 @@ def prepare(): preexec_fn=prepare) except ValueError as err: # Pure Python implementations keeps the message - self.assertIsNone(subprocess._posixsubprocess) + self.assertIsNone(subprocess._fork_exec) self.assertEqual(str(err), "surrogate:\uDCff") except subprocess.SubprocessError as err: # _posixsubprocess uses a default message - self.assertIsNotNone(subprocess._posixsubprocess) + self.assertIsNotNone(subprocess._fork_exec) self.assertEqual(str(err), "Exception occurred in preexec_fn.") else: self.fail("Expected ValueError or subprocess.SubprocessError") - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_undecodable_env(self): for key, value in (('test', 'abc\uDCFF'), ('test\uDCFF', '42')): encoded_value = value.encode("ascii", "surrogateescape") @@ -2785,7 +2782,6 @@ def kill_p2(): p1.stdout.close() p2.stdout.close() - @unittest.skip("TODO: RUSTPYTHON, flaky test") def test_close_fds(self): fd_status = support.findfile("fd_status.py", subdir="subprocessdata") @@ -2913,7 +2909,6 @@ def test_close_fds_when_max_fd_is_lowered(self): msg="Some fds were left open.") - @unittest.skip("TODO: RUSTPYTHON, flaky test") # Mac OS X Tiger (10.4) has a kernel bug: sometimes, the file # descriptor of a pipe closed in the parent process is valid in the # child process according to fstat(), but the mode of the file @@ -3121,8 +3116,6 @@ def test_leak_fast_process_del_killed(self): else: self.assertNotIn(ident, [id(o) for o in subprocess._active]) - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_close_fds_after_preexec(self): fd_status = support.findfile("fd_status.py", subdir="subprocessdata") @@ -3165,9 +3158,9 @@ def test_fork_exec(self): True, (), cwd, env_list, -1, -1, -1, -1, 1, 2, 3, 4, - True, True, + True, True, 0, False, [], 0, -1, - func) + func, False) # Attempt to prevent # "TypeError: fork_exec() takes exactly N arguments (M given)" # from passing the test. More refactoring to have us start @@ -3214,9 +3207,9 @@ def __int__(self): True, fds_to_keep, None, [b"env"], -1, -1, -1, -1, 1, 2, 3, 4, - True, True, + True, True, 0, None, None, None, -1, - None) + None, "no vfork") self.assertIn('fds_to_keep', str(c.exception)) finally: if not gc_enabled: @@ -3319,6 +3312,7 @@ def test_send_signal_race2(self): with mock.patch.object(p, 'poll', new=lambda: None): p.returncode = None p.send_signal(signal.SIGTERM) + p.kill() def test_communicate_repeated_call_after_stdout_close(self): proc = subprocess.Popen([sys.executable, '-c', @@ -3424,8 +3418,6 @@ def test_close_fds(self): close_fds=True) self.assertEqual(rc, 47) - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_close_fds_with_stdio(self): import msvcrt @@ -3508,8 +3500,6 @@ def test_shell_string(self): with p: self.assertIn(b"physalis", p.stdout.read()) - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_shell_encodings(self): # Run command through the shell (string) for enc in ['ansi', 'oem']: @@ -3656,7 +3646,6 @@ def popen_via_context_manager(*args, **kwargs): raise KeyboardInterrupt # Test how __exit__ handles ^C. self._test_keyboardinterrupt_no_kill(popen_via_context_manager) - @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON") def test_getoutput(self): self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy') self.assertEqual(subprocess.getstatusoutput('echo xyzzy'), @@ -3729,28 +3718,20 @@ def with_spaces(self, *args, **kwargs): "2 [%r, 'ab cd']" % self.fname ) - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_shell_string_with_spaces(self): # call() function with string argument with spaces on Windows self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname, "ab cd"), shell=1) - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_shell_sequence_with_spaces(self): # call() function with sequence argument with spaces on Windows self.with_spaces([sys.executable, self.fname, "ab cd"], shell=1) - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_noshell_string_with_spaces(self): # call() function with string argument with spaces on Windows self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname, "ab cd")) - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_noshell_sequence_with_spaces(self): # call() function with sequence argument with spaces on Windows self.with_spaces([sys.executable, self.fname, "ab cd"]) From 283a8046aade9ed676e49bdd44a0587da6e1028a Mon Sep 17 00:00:00 2001 From: DimitrisJim Date: Fri, 19 May 2023 22:38:38 +0300 Subject: [PATCH 3/3] Mark failing tests. --- Lib/test/test_subprocess.py | 70 +++++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py index 424a4a93b6..0a72ffade8 100644 --- a/Lib/test/test_subprocess.py +++ b/Lib/test/test_subprocess.py @@ -811,6 +811,7 @@ def is_env_var_to_ignore(n): if not is_env_var_to_ignore(k)] self.assertEqual(child_env_names, []) + @unittest.skipIf(sys.platform == "win32", "TODO: RUSTPYTHON, null byte is not checked") def test_invalid_cmd(self): # null character in the command name cmd = sys.executable + '\0' @@ -956,6 +957,7 @@ def test_communicate_returns(self): self.assertEqual(stdout, None) self.assertEqual(stderr, None) + @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON") def test_communicate_pipe_buf(self): # communicate() with writes larger than pipe_buf # This test will probably deadlock rather than fail, if @@ -995,6 +997,8 @@ def test_writes_before_communicate(self): self.assertEqual(stdout, b"bananasplit") self.assertEqual(stderr, b"") + # TODO: RUSTPYTHON + @unittest.expectedFailure def test_universal_newlines_and_text(self): args = [ sys.executable, "-c", @@ -1034,6 +1038,7 @@ def test_universal_newlines_and_text(self): self.assertEqual(p.stdout.read(), "line4\nline5\nline6\nline7\nline8") + @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON") def test_universal_newlines_communicate(self): # universal newlines through communicate() p = subprocess.Popen([sys.executable, "-c", @@ -1085,6 +1090,7 @@ def test_universal_newlines_communicate_input_none(self): p.communicate() self.assertEqual(p.returncode, 0) + @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON") def test_universal_newlines_communicate_stdin_stdout_stderr(self): # universal newlines through communicate(), with stdin, stdout, stderr p = subprocess.Popen([sys.executable, "-c", @@ -1113,6 +1119,8 @@ def test_universal_newlines_communicate_stdin_stdout_stderr(self): # to stderr at exit of subprocess. self.assertTrue(stderr.startswith("eline2\neline6\neline7\n")) + # TODO: RUSTPYTHON + @unittest.expectedFailure def test_universal_newlines_communicate_encodings(self): # Check that universal newlines mode works for various encodings, # in particular for encodings in the UTF-16 and UTF-32 families. @@ -1135,6 +1143,8 @@ def test_universal_newlines_communicate_encodings(self): stdout, stderr = popen.communicate(input='') self.assertEqual(stdout, '1\n2\n3\n4') + # TODO: RUSTPYTHON + @unittest.expectedFailure def test_communicate_errors(self): for errors, expected in [ ('ignore', ''), @@ -1274,12 +1284,15 @@ def _test_bufsize_equal_one(self, line, expected, universal_newlines): self.assertEqual(p.returncode, 0) self.assertEqual(read_line, expected) + @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON") def test_bufsize_equal_one_text_mode(self): # line is flushed in text mode with bufsize=1. # we should get the full line in return line = "line\n" self._test_bufsize_equal_one(line, line, universal_newlines=True) + # TODO: RUSTPYTHON + @unittest.expectedFailure def test_bufsize_equal_one_binary_mode(self): # line is not flushed in binary mode with bufsize=1. # we should get empty response @@ -1452,6 +1465,7 @@ def test_handles_closed_on_exception(self): self.assertFalse(os.path.exists(ofname)) self.assertFalse(os.path.exists(efname)) + @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON") def test_communicate_epipe(self): # Issue 10963: communicate() should hide EPIPE p = subprocess.Popen(ZERO_RETURN_CMD, @@ -1482,6 +1496,7 @@ def test_repr(self): p.returncode = code self.assertEqual(repr(p), sx) + @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON") def test_communicate_epipe_only_stdin(self): # Issue 10963: communicate() should hide EPIPE p = subprocess.Popen(ZERO_RETURN_CMD, @@ -1490,6 +1505,8 @@ def test_communicate_epipe_only_stdin(self): p.wait() p.communicate(b"x" * 2**20) + # TODO: RUSTPYTHON + @unittest.expectedFailure @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), "Requires signal.SIGUSR1") @unittest.skipUnless(hasattr(os, 'kill'), @@ -1539,6 +1556,8 @@ def test_file_not_found_includes_filename(self): subprocess.call(['/opt/nonexistent_binary', 'with', 'some', 'args']) self.assertEqual(c.exception.filename, '/opt/nonexistent_binary') + # TODO: RUSTPYTHON + @unittest.expectedFailure @unittest.skipIf(mswindows, "behavior currently not supported on Windows") def test_file_not_found_with_bad_cwd(self): with self.assertRaises(FileNotFoundError) as c: @@ -1739,6 +1758,8 @@ def test_run_with_shell_timeout_and_capture_output(self): msg="TimeoutExpired was delayed! Bad traceback:\n```\n" f"{stacks}```") + # TODO: RUSTPYTHON + @unittest.expectedFailure def test_encoding_warning(self): code = textwrap.dedent("""\ from subprocess import * @@ -1785,6 +1806,8 @@ def _get_chdir_exception(self): self._nonexistent_dir) return desired_exception + # TODO: RUSTPYTHON + @unittest.expectedFailure def test_exception_cwd(self): """Test error in the child raised in the parent for a bad cwd.""" desired_exception = self._get_chdir_exception() @@ -1800,6 +1823,8 @@ def test_exception_cwd(self): else: self.fail("Expected OSError: %s" % desired_exception) + # TODO: RUSTPYTHON + @unittest.expectedFailure def test_exception_bad_executable(self): """Test error in the child raised in the parent for a bad executable.""" desired_exception = self._get_chdir_exception() @@ -1815,6 +1840,8 @@ def test_exception_bad_executable(self): else: self.fail("Expected OSError: %s" % desired_exception) + # TODO: RUSTPYTHON + @unittest.expectedFailure def test_exception_bad_args_0(self): """Test error in the child raised in the parent for a bad args[0].""" desired_exception = self._get_chdir_exception() @@ -1879,6 +1906,8 @@ def bad_error(*args): self.assertIn(repr(error_data), str(e.exception)) + # TODO: RUSTPYTHON + @unittest.expectedFailure @unittest.skipIf(not os.path.exists('/proc/self/status'), "need /proc/self/status") def test_restore_signals(self): @@ -1919,6 +1948,8 @@ def test_start_new_session(self): child_sid = int(output) self.assertNotEqual(parent_sid, child_sid) + # TODO: RUSTPYTHON + @unittest.expectedFailure @unittest.skipUnless(hasattr(os, 'setpgid') and hasattr(os, 'getpgid'), 'no setpgid or getpgid on platform') def test_process_group_0(self): @@ -1937,6 +1968,8 @@ def test_process_group_0(self): child_pgid = int(output) self.assertNotEqual(parent_pgid, child_pgid) + # TODO: RUSTPYTHON + @unittest.expectedFailure @unittest.skipUnless(hasattr(os, 'setreuid'), 'no setreuid on platform') def test_user(self): # For code coverage of the user parameter. We don't care if we get an @@ -1994,6 +2027,8 @@ def test_user_error(self): with self.assertRaises(ValueError): subprocess.check_call(ZERO_RETURN_CMD, user=65535) + # TODO: RUSTPYTHON, observed gids do not match expected gids + @unittest.expectedFailure @unittest.skipUnless(hasattr(os, 'setregid'), 'no setregid() on platform') def test_group(self): gid = os.getegid() @@ -2041,6 +2076,8 @@ def test_group_error(self): with self.assertRaises(ValueError): subprocess.check_call(ZERO_RETURN_CMD, group=65535) + # TODO: RUSTPYTHON, observed gids do not match expected gids + @unittest.expectedFailure @unittest.skipUnless(hasattr(os, 'setgroups'), 'no setgroups() on platform') def test_extra_groups(self): gid = os.getegid() @@ -2095,6 +2132,8 @@ def test_extra_groups_error(self): with self.assertRaises(ValueError): subprocess.check_call(ZERO_RETURN_CMD, extra_groups=[]) + # TODO: RUSTPYTHON + @unittest.expectedFailure @unittest.skipIf(mswindows or not hasattr(os, 'umask'), 'POSIX umask() is not available.') def test_umask(self): @@ -2146,6 +2185,8 @@ def test_CalledProcessError_str_non_zero(self): error_string = str(err) self.assertIn("non-zero exit status 2.", error_string) + # TODO: RUSTPYTHON + @unittest.expectedFailure def test_preexec(self): # DISCLAIMER: Setting environment variables is *not* a good use # of a preexec_fn. This is merely a test. @@ -2157,6 +2198,8 @@ def test_preexec(self): with p: self.assertEqual(p.stdout.read(), b"apple") + # TODO: RUSTPYTHON + @unittest.expectedFailure def test_preexec_exception(self): def raise_it(): raise ValueError("What if two swallows carried a coconut?") @@ -2198,6 +2241,8 @@ def _execute_child(self, *args, **kwargs): for fd in devzero_fds: os.close(fd) + # TODO: RUSTPYTHON + @unittest.expectedFailure @unittest.skipIf(not os.path.exists("/dev/zero"), "/dev/zero required.") def test_preexec_errpipe_does_not_double_close_pipes(self): """Issue16140: Don't double close pipes on preexec error.""" @@ -2212,6 +2257,8 @@ def raise_it(): stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, preexec_fn=raise_it) + # TODO: RUSTPYTHON + @unittest.expectedFailure def test_preexec_gc_module_failure(self): # This tests the code that disables garbage collection if the child # process will execute any Python. @@ -2233,6 +2280,8 @@ def test_preexec_gc_module_failure(self): if not enabled: gc.disable() + # TODO: RUSTPYTHON + @unittest.expectedFailure @unittest.skipIf( sys.platform == 'darwin', 'setrlimit() seems to fail on OS X') def test_preexec_fork_failure(self): @@ -2643,6 +2692,8 @@ def test_swap_std_fds_with_one_closed(self): for to_fds in itertools.permutations(range(3), 2): self._check_swap_std_fds_with_one_closed(from_fds, to_fds) + # TODO: RUSTPYTHON + @unittest.expectedFailure def test_surrogates_error_message(self): def prepare(): raise ValueError("surrogate:\uDCff") @@ -2662,6 +2713,8 @@ def prepare(): else: self.fail("Expected ValueError or subprocess.SubprocessError") + # TODO: RUSTPYTHON + @unittest.expectedFailure def test_undecodable_env(self): for key, value in (('test', 'abc\uDCFF'), ('test\uDCFF', '42')): encoded_value = value.encode("ascii", "surrogateescape") @@ -2782,6 +2835,7 @@ def kill_p2(): p1.stdout.close() p2.stdout.close() + @unittest.skip("TODO: RUSTPYTHON, flaky test") def test_close_fds(self): fd_status = support.findfile("fd_status.py", subdir="subprocessdata") @@ -2909,6 +2963,7 @@ def test_close_fds_when_max_fd_is_lowered(self): msg="Some fds were left open.") + @unittest.skip("TODO: RUSTPYTHON, flaky test") # Mac OS X Tiger (10.4) has a kernel bug: sometimes, the file # descriptor of a pipe closed in the parent process is valid in the # child process according to fstat(), but the mode of the file @@ -3116,6 +3171,8 @@ def test_leak_fast_process_del_killed(self): else: self.assertNotIn(ident, [id(o) for o in subprocess._active]) + # TODO: RUSTPYTHON + @unittest.expectedFailure def test_close_fds_after_preexec(self): fd_status = support.findfile("fd_status.py", subdir="subprocessdata") @@ -3418,6 +3475,8 @@ def test_close_fds(self): close_fds=True) self.assertEqual(rc, 47) + # TODO: RUSTPYTHON + @unittest.expectedFailure def test_close_fds_with_stdio(self): import msvcrt @@ -3500,6 +3559,8 @@ def test_shell_string(self): with p: self.assertIn(b"physalis", p.stdout.read()) + # TODO: RUSTPYTHON + @unittest.expectedFailure def test_shell_encodings(self): # Run command through the shell (string) for enc in ['ansi', 'oem']: @@ -3646,6 +3707,7 @@ def popen_via_context_manager(*args, **kwargs): raise KeyboardInterrupt # Test how __exit__ handles ^C. self._test_keyboardinterrupt_no_kill(popen_via_context_manager) + @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON") def test_getoutput(self): self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy') self.assertEqual(subprocess.getstatusoutput('echo xyzzy'), @@ -3718,20 +3780,28 @@ def with_spaces(self, *args, **kwargs): "2 [%r, 'ab cd']" % self.fname ) + # TODO: RUSTPYTHON + @unittest.expectedFailure def test_shell_string_with_spaces(self): # call() function with string argument with spaces on Windows self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname, "ab cd"), shell=1) + # TODO: RUSTPYTHON + @unittest.expectedFailure def test_shell_sequence_with_spaces(self): # call() function with sequence argument with spaces on Windows self.with_spaces([sys.executable, self.fname, "ab cd"], shell=1) + # TODO: RUSTPYTHON + @unittest.expectedFailure def test_noshell_string_with_spaces(self): # call() function with string argument with spaces on Windows self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname, "ab cd")) + # TODO: RUSTPYTHON + @unittest.expectedFailure def test_noshell_sequence_with_spaces(self): # call() function with sequence argument with spaces on Windows self.with_spaces([sys.executable, self.fname, "ab cd"])