diff --git a/Lib/pty.py b/Lib/pty.py index 8d8ce40df5..1d97994abe 100644 --- a/Lib/pty.py +++ b/Lib/pty.py @@ -40,6 +40,9 @@ def master_open(): Open a pty master and return the fd, and the filename of the slave end. Deprecated, use openpty() instead.""" + import warnings + warnings.warn("Use pty.openpty() instead.", DeprecationWarning, stacklevel=2) # Remove API in 3.14 + try: master_fd, slave_fd = os.openpty() except (AttributeError, OSError): @@ -69,6 +72,9 @@ def slave_open(tty_name): opened filedescriptor. Deprecated, use openpty() instead.""" + import warnings + warnings.warn("Use pty.openpty() instead.", DeprecationWarning, stacklevel=2) # Remove API in 3.14 + result = os.open(tty_name, os.O_RDWR) try: from fcntl import ioctl, I_PUSH @@ -101,32 +107,14 @@ def fork(): master_fd, slave_fd = openpty() pid = os.fork() if pid == CHILD: - # Establish a new session. - os.setsid() os.close(master_fd) - - # Slave becomes stdin/stdout/stderr of child. - os.dup2(slave_fd, STDIN_FILENO) - os.dup2(slave_fd, STDOUT_FILENO) - os.dup2(slave_fd, STDERR_FILENO) - if slave_fd > STDERR_FILENO: - os.close(slave_fd) - - # Explicitly open the tty to make it become a controlling tty. - tmp_fd = os.open(os.ttyname(STDOUT_FILENO), os.O_RDWR) - os.close(tmp_fd) + os.login_tty(slave_fd) else: os.close(slave_fd) # Parent and child process. return pid, master_fd -def _writen(fd, data): - """Write all the data to a descriptor.""" - while data: - n = os.write(fd, data) - data = data[n:] - def _read(fd): """Default read function.""" return os.read(fd, 1024) @@ -136,9 +124,42 @@ def _copy(master_fd, master_read=_read, stdin_read=_read): Copies pty master -> standard output (master_read) standard input -> pty master (stdin_read)""" - fds = [master_fd, STDIN_FILENO] - while fds: - rfds, _wfds, _xfds = select(fds, [], []) + if os.get_blocking(master_fd): + # If we write more than tty/ndisc is willing to buffer, we may block + # indefinitely. So we set master_fd to non-blocking temporarily during + # the copy operation. + os.set_blocking(master_fd, False) + try: + _copy(master_fd, master_read=master_read, stdin_read=stdin_read) + finally: + # restore blocking mode for backwards compatibility + os.set_blocking(master_fd, True) + return + high_waterlevel = 4096 + stdin_avail = master_fd != STDIN_FILENO + stdout_avail = master_fd != STDOUT_FILENO + i_buf = b'' + o_buf = b'' + while 1: + rfds = [] + wfds = [] + if stdin_avail and len(i_buf) < high_waterlevel: + rfds.append(STDIN_FILENO) + if stdout_avail and len(o_buf) < high_waterlevel: + rfds.append(master_fd) + if stdout_avail and len(o_buf) > 0: + wfds.append(STDOUT_FILENO) + if len(i_buf) > 0: + wfds.append(master_fd) + + rfds, wfds, _xfds = select(rfds, wfds, []) + + if STDOUT_FILENO in wfds: + try: + n = os.write(STDOUT_FILENO, o_buf) + o_buf = o_buf[n:] + except OSError: + stdout_avail = False if master_fd in rfds: # Some OSes signal EOF by returning an empty byte string, @@ -150,19 +171,22 @@ def _copy(master_fd, master_read=_read, stdin_read=_read): if not data: # Reached EOF. return # Assume the child process has exited and is # unreachable, so we clean up. - else: - os.write(STDOUT_FILENO, data) + o_buf += data + + if master_fd in wfds: + n = os.write(master_fd, i_buf) + i_buf = i_buf[n:] - if STDIN_FILENO in rfds: + if stdin_avail and STDIN_FILENO in rfds: data = stdin_read(STDIN_FILENO) if not data: - fds.remove(STDIN_FILENO) + stdin_avail = False else: - _writen(master_fd, data) + i_buf += data def spawn(argv, master_read=_read, stdin_read=_read): """Create a spawned process.""" - if type(argv) == type(''): + if isinstance(argv, str): argv = (argv,) sys.audit('pty.spawn', argv) diff --git a/Lib/test/test_pty.py b/Lib/test/test_pty.py index b1c1f6abff..c1728f5019 100644 --- a/Lib/test/test_pty.py +++ b/Lib/test/test_pty.py @@ -1,10 +1,16 @@ -from test import support -from test.support import verbose, reap_children +import unittest +from test.support import ( + is_android, is_apple_mobile, is_emscripten, is_wasi, reap_children, verbose +) from test.support.import_helper import import_module +from test.support.os_helper import TESTFN, unlink # Skip these tests if termios is not available import_module('termios') +if is_android or is_apple_mobile or is_emscripten or is_wasi: + raise unittest.SkipTest("pty is not available on this platform") + import errno import os import pty @@ -14,21 +20,12 @@ import signal import socket import io # readline -import unittest - -import struct -import fcntl import warnings TEST_STRING_1 = b"I wish to buy a fish license.\n" TEST_STRING_2 = b"For my pet fish, Eric.\n" -try: - _TIOCGWINSZ = tty.TIOCGWINSZ - _TIOCSWINSZ = tty.TIOCSWINSZ - _HAVE_WINSZ = True -except AttributeError: - _HAVE_WINSZ = False +_HAVE_WINSZ = hasattr(tty, "TIOCGWINSZ") and hasattr(tty, "TIOCSWINSZ") if verbose: def debug(msg): @@ -82,46 +79,32 @@ def expectedFailureIfStdinIsTTY(fun): pass return fun -def _get_term_winsz(fd): - s = struct.pack("HHHH", 0, 0, 0, 0) - return fcntl.ioctl(fd, _TIOCGWINSZ, s) -def _set_term_winsz(fd, winsz): - fcntl.ioctl(fd, _TIOCSWINSZ, winsz) +def write_all(fd, data): + written = os.write(fd, data) + if written != len(data): + # gh-73256, gh-110673: It should never happen, but check just in case + raise Exception(f"short write: os.write({fd}, {len(data)} bytes) " + f"wrote {written} bytes") # Marginal testing of pty suite. Cannot do extensive 'do or fail' testing # because pty code is not too portable. class PtyTest(unittest.TestCase): def setUp(self): - old_alarm = signal.signal(signal.SIGALRM, self.handle_sig) - self.addCleanup(signal.signal, signal.SIGALRM, old_alarm) - old_sighup = signal.signal(signal.SIGHUP, self.handle_sighup) self.addCleanup(signal.signal, signal.SIGHUP, old_sighup) - # isatty() and close() can hang on some platforms. Set an alarm - # before running the test to make sure we don't hang forever. - self.addCleanup(signal.alarm, 0) - signal.alarm(10) - - # Save original stdin window size - self.stdin_rows = None - self.stdin_cols = None + # Save original stdin window size. + self.stdin_dim = None if _HAVE_WINSZ: try: - stdin_dim = os.get_terminal_size(pty.STDIN_FILENO) - self.stdin_rows = stdin_dim.lines - self.stdin_cols = stdin_dim.columns - old_stdin_winsz = struct.pack("HHHH", self.stdin_rows, - self.stdin_cols, 0, 0) - self.addCleanup(_set_term_winsz, pty.STDIN_FILENO, old_stdin_winsz) - except OSError: + self.stdin_dim = tty.tcgetwinsize(pty.STDIN_FILENO) + self.addCleanup(tty.tcsetwinsize, pty.STDIN_FILENO, + self.stdin_dim) + except tty.error: pass - def handle_sig(self, sig, frame): - self.fail("isatty hung") - @staticmethod def handle_sighup(signum, frame): pass @@ -131,41 +114,42 @@ def test_openpty(self): try: mode = tty.tcgetattr(pty.STDIN_FILENO) except tty.error: - # not a tty or bad/closed fd + # Not a tty or bad/closed fd. debug("tty.tcgetattr(pty.STDIN_FILENO) failed") mode = None - new_stdin_winsz = None - if self.stdin_rows is not None and self.stdin_cols is not None: + new_dim = None + if self.stdin_dim: try: # Modify pty.STDIN_FILENO window size; we need to # check if pty.openpty() is able to set pty slave # window size accordingly. - debug("Setting pty.STDIN_FILENO window size") - debug(f"original size: (rows={self.stdin_rows}, cols={self.stdin_cols})") - target_stdin_rows = self.stdin_rows + 1 - target_stdin_cols = self.stdin_cols + 1 - debug(f"target size: (rows={target_stdin_rows}, cols={target_stdin_cols})") - target_stdin_winsz = struct.pack("HHHH", target_stdin_rows, - target_stdin_cols, 0, 0) - _set_term_winsz(pty.STDIN_FILENO, target_stdin_winsz) + debug("Setting pty.STDIN_FILENO window size.") + debug(f"original size: (row, col) = {self.stdin_dim}") + target_dim = (self.stdin_dim[0] + 1, self.stdin_dim[1] + 1) + debug(f"target size: (row, col) = {target_dim}") + tty.tcsetwinsize(pty.STDIN_FILENO, target_dim) # Were we able to set the window size # of pty.STDIN_FILENO successfully? - new_stdin_winsz = _get_term_winsz(pty.STDIN_FILENO) - self.assertEqual(new_stdin_winsz, target_stdin_winsz, + new_dim = tty.tcgetwinsize(pty.STDIN_FILENO) + self.assertEqual(new_dim, target_dim, "pty.STDIN_FILENO window size unchanged") - except OSError: - warnings.warn("Failed to set pty.STDIN_FILENO window size") + except OSError as e: + logging.getLogger(__name__).warning( + "Failed to set pty.STDIN_FILENO window size.", exc_info=e, + ) pass try: debug("Calling pty.openpty()") try: - master_fd, slave_fd = pty.openpty(mode, new_stdin_winsz) + master_fd, slave_fd, slave_name = pty.openpty(mode, new_dim, + True) except TypeError: master_fd, slave_fd = pty.openpty() - debug(f"Got master_fd '{master_fd}', slave_fd '{slave_fd}'") + slave_name = None + debug(f"Got {master_fd=}, {slave_fd=}, {slave_name=}") except OSError: # " An optional feature could not be imported " ... ? raise unittest.SkipTest("Pseudo-terminals (seemingly) not functional.") @@ -181,8 +165,8 @@ def test_openpty(self): if mode: self.assertEqual(tty.tcgetattr(slave_fd), mode, "openpty() failed to set slave termios") - if new_stdin_winsz: - self.assertEqual(_get_term_winsz(slave_fd), new_stdin_winsz, + if new_dim: + self.assertEqual(tty.tcgetwinsize(slave_fd), new_dim, "openpty() failed to set slave window size") # Ensure the fd is non-blocking in case there's nothing to read. @@ -200,18 +184,17 @@ def test_openpty(self): os.set_blocking(master_fd, blocking) debug("Writing to slave_fd") - os.write(slave_fd, TEST_STRING_1) + write_all(slave_fd, TEST_STRING_1) s1 = _readline(master_fd) self.assertEqual(b'I wish to buy a fish license.\n', normalize_output(s1)) debug("Writing chunked output") - os.write(slave_fd, TEST_STRING_2[:5]) - os.write(slave_fd, TEST_STRING_2[5:]) + write_all(slave_fd, TEST_STRING_2[:5]) + write_all(slave_fd, TEST_STRING_2[5:]) s2 = _readline(master_fd) self.assertEqual(b'For my pet fish, Eric.\n', normalize_output(s2)) - @support.requires_fork() def test_fork(self): debug("calling pty.fork()") pid, master_fd = pty.fork() @@ -314,7 +297,26 @@ def test_master_read(self): self.assertEqual(data, b"") def test_spawn_doesnt_hang(self): - pty.spawn([sys.executable, '-c', 'print("hi there")']) + self.addCleanup(unlink, TESTFN) + with open(TESTFN, 'wb') as f: + STDOUT_FILENO = 1 + dup_stdout = os.dup(STDOUT_FILENO) + os.dup2(f.fileno(), STDOUT_FILENO) + buf = b'' + def master_read(fd): + nonlocal buf + data = os.read(fd, 1024) + buf += data + return data + try: + pty.spawn([sys.executable, '-c', 'print("hi there")'], + master_read) + finally: + os.dup2(dup_stdout, STDOUT_FILENO) + os.close(dup_stdout) + self.assertEqual(buf, b'hi there\r\n') + with open(TESTFN, 'rb') as f: + self.assertEqual(f.read(), b'hi there\r\n') class SmallPtyTests(unittest.TestCase): """These tests don't spawn children or hang.""" @@ -332,8 +334,8 @@ def setUp(self): self.orig_pty_waitpid = pty.waitpid self.fds = [] # A list of file descriptors to close. self.files = [] - self.select_rfds_lengths = [] - self.select_rfds_results = [] + self.select_input = [] + self.select_output = [] self.tcsetattr_mode_setting = None def tearDown(self): @@ -368,11 +370,10 @@ def _socketpair(self): self.files.extend(socketpair) return socketpair - def _mock_select(self, rfds, wfds, xfds, timeout=0): + def _mock_select(self, rfds, wfds, xfds): # This will raise IndexError when no more expected calls exist. - # This ignores the timeout - self.assertEqual(self.select_rfds_lengths.pop(0), len(rfds)) - return self.select_rfds_results.pop(0), [], [] + self.assertEqual((rfds, wfds, xfds), self.select_input.pop(0)) + return self.select_output.pop(0) def _make_mock_fork(self, pid): def mock_fork(): @@ -392,14 +393,16 @@ def test__copy_to_each(self): masters = [s.fileno() for s in socketpair] # Feed data. Smaller than PIPEBUF. These writes will not block. - os.write(masters[1], b'from master') - os.write(write_to_stdin_fd, b'from stdin') + write_all(masters[1], b'from master') + write_all(write_to_stdin_fd, b'from stdin') - # Expect two select calls, the last one will cause IndexError + # Expect three select calls, the last one will cause IndexError pty.select = self._mock_select - self.select_rfds_lengths.append(2) - self.select_rfds_results.append([mock_stdin_fd, masters[0]]) - self.select_rfds_lengths.append(2) + self.select_input.append(([mock_stdin_fd, masters[0]], [], [])) + self.select_output.append(([mock_stdin_fd, masters[0]], [], [])) + self.select_input.append(([mock_stdin_fd, masters[0]], [mock_stdout_fd, masters[0]], [])) + self.select_output.append(([], [mock_stdout_fd, masters[0]], [])) + self.select_input.append(([mock_stdin_fd, masters[0]], [], [])) with self.assertRaises(IndexError): pty._copy(masters[0]) @@ -410,28 +413,6 @@ def test__copy_to_each(self): self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master') self.assertEqual(os.read(masters[1], 20), b'from stdin') - def test__copy_eof_on_all(self): - """Test the empty read EOF case on both master_fd and stdin.""" - read_from_stdout_fd, mock_stdout_fd = self._pipe() - pty.STDOUT_FILENO = mock_stdout_fd - mock_stdin_fd, write_to_stdin_fd = self._pipe() - pty.STDIN_FILENO = mock_stdin_fd - socketpair = self._socketpair() - masters = [s.fileno() for s in socketpair] - - socketpair[1].close() - os.close(write_to_stdin_fd) - - pty.select = self._mock_select - self.select_rfds_lengths.append(2) - self.select_rfds_results.append([mock_stdin_fd, masters[0]]) - # We expect that both fds were removed from the fds list as they - # both encountered an EOF before the second select call. - self.select_rfds_lengths.append(0) - - # We expect the function to return without error. - self.assertEqual(pty._copy(masters[0]), None) - def test__restore_tty_mode_normal_return(self): """Test that spawn resets the tty mode no when _copy returns normally."""