Skip to content

Update pty to 3.13.3 #5738

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 52 additions & 28 deletions Lib/pty.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ def master_open():
Open a pty master and return the fd, and the filename of the slave end.
Deprecated, use openpty() instead."""

import warnings
warnings.warn("Use pty.openpty() instead.", DeprecationWarning, stacklevel=2) # Remove API in 3.14

try:
master_fd, slave_fd = os.openpty()
except (AttributeError, OSError):
Expand Down Expand Up @@ -69,6 +72,9 @@ def slave_open(tty_name):
opened filedescriptor.
Deprecated, use openpty() instead."""

import warnings
warnings.warn("Use pty.openpty() instead.", DeprecationWarning, stacklevel=2) # Remove API in 3.14

result = os.open(tty_name, os.O_RDWR)
try:
from fcntl import ioctl, I_PUSH
Expand Down Expand Up @@ -101,32 +107,14 @@ def fork():
master_fd, slave_fd = openpty()
pid = os.fork()
if pid == CHILD:
# Establish a new session.
os.setsid()
os.close(master_fd)

# Slave becomes stdin/stdout/stderr of child.
os.dup2(slave_fd, STDIN_FILENO)
os.dup2(slave_fd, STDOUT_FILENO)
os.dup2(slave_fd, STDERR_FILENO)
if slave_fd > STDERR_FILENO:
os.close(slave_fd)

# Explicitly open the tty to make it become a controlling tty.
tmp_fd = os.open(os.ttyname(STDOUT_FILENO), os.O_RDWR)
os.close(tmp_fd)
os.login_tty(slave_fd)
else:
os.close(slave_fd)

# Parent and child process.
return pid, master_fd

def _writen(fd, data):
"""Write all the data to a descriptor."""
while data:
n = os.write(fd, data)
data = data[n:]

def _read(fd):
"""Default read function."""
return os.read(fd, 1024)
Expand All @@ -136,9 +124,42 @@ def _copy(master_fd, master_read=_read, stdin_read=_read):
Copies
pty master -> standard output (master_read)
standard input -> pty master (stdin_read)"""
fds = [master_fd, STDIN_FILENO]
while fds:
rfds, _wfds, _xfds = select(fds, [], [])
if os.get_blocking(master_fd):
# If we write more than tty/ndisc is willing to buffer, we may block
# indefinitely. So we set master_fd to non-blocking temporarily during
# the copy operation.
os.set_blocking(master_fd, False)
try:
_copy(master_fd, master_read=master_read, stdin_read=stdin_read)
finally:
# restore blocking mode for backwards compatibility
os.set_blocking(master_fd, True)
return
high_waterlevel = 4096
stdin_avail = master_fd != STDIN_FILENO
stdout_avail = master_fd != STDOUT_FILENO
i_buf = b''
o_buf = b''
while 1:
rfds = []
wfds = []
if stdin_avail and len(i_buf) < high_waterlevel:
rfds.append(STDIN_FILENO)
if stdout_avail and len(o_buf) < high_waterlevel:
rfds.append(master_fd)
if stdout_avail and len(o_buf) > 0:
wfds.append(STDOUT_FILENO)
if len(i_buf) > 0:
wfds.append(master_fd)

rfds, wfds, _xfds = select(rfds, wfds, [])

if STDOUT_FILENO in wfds:
try:
n = os.write(STDOUT_FILENO, o_buf)
o_buf = o_buf[n:]
except OSError:
stdout_avail = False

if master_fd in rfds:
# Some OSes signal EOF by returning an empty byte string,
Expand All @@ -150,19 +171,22 @@ def _copy(master_fd, master_read=_read, stdin_read=_read):
if not data: # Reached EOF.
return # Assume the child process has exited and is
# unreachable, so we clean up.
else:
os.write(STDOUT_FILENO, data)
o_buf += data

if master_fd in wfds:
n = os.write(master_fd, i_buf)
i_buf = i_buf[n:]

if STDIN_FILENO in rfds:
if stdin_avail and STDIN_FILENO in rfds:
data = stdin_read(STDIN_FILENO)
if not data:
fds.remove(STDIN_FILENO)
stdin_avail = False
else:
_writen(master_fd, data)
i_buf += data

def spawn(argv, master_read=_read, stdin_read=_read):
"""Create a spawned process."""
if type(argv) == type(''):
if isinstance(argv, str):
argv = (argv,)
sys.audit('pty.spawn', argv)

Expand Down
Loading
Loading