Skip to content

gh-119517: Fixes for pasting in pyrepl #120253

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jun 11, 2024
3 changes: 0 additions & 3 deletions Lib/_pyrepl/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,6 @@ def do(self) -> None:
r = self.reader
text = self.event * r.get_arg()
r.insert(text)
if len(text) == 1 and r.pos == len(r.buffer):
r.calc_screen = r.append_to_screen


class insert_nl(EditCommand):
Expand Down Expand Up @@ -483,4 +481,3 @@ def do(self) -> None:
self.reader.paste_mode = False
self.reader.in_bracketed_paste = False
self.reader.dirty = True
self.reader.calc_screen = self.reader.calc_complete_screen
8 changes: 2 additions & 6 deletions Lib/_pyrepl/completing_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,6 @@ def do(self) -> None:
r = self.reader # type: ignore[assignment]

commands.self_insert.do(self)

if r.cmpltn_menu_visible or r.cmpltn_message_visible:
r.calc_screen = r.calc_complete_screen

if r.cmpltn_menu_visible:
stem = r.get_stem()
if len(stem) < 1:
Expand Down Expand Up @@ -261,8 +257,8 @@ def after_command(self, cmd: Command) -> None:
if not isinstance(cmd, (complete, self_insert)):
self.cmpltn_reset()

def calc_complete_screen(self) -> list[str]:
screen = super().calc_complete_screen()
def calc_screen(self) -> list[str]:
screen = super().calc_screen()
if self.cmpltn_menu_visible:
ly = self.lxy[1]
screen[ly:ly] = self.cmpltn_menu
Expand Down
154 changes: 109 additions & 45 deletions Lib/_pyrepl/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,13 @@
# types
Command = commands.Command
if False:
from typing import Callable
from .types import Callback, SimpleContextManager, KeySpec, CommandName
CalcScreen = Callable[[], list[str]]


def disp_str(buffer: str) -> tuple[str, list[int]]:
"""disp_str(buffer:string) -> (string, [int])

Return the string that should be the printed represenation of
Return the string that should be the printed representation of
|buffer| and a list detailing where the characters of |buffer|
get used up. E.g.:

Expand All @@ -54,11 +52,17 @@ def disp_str(buffer: str) -> tuple[str, list[int]]:
b: list[int] = []
s: list[str] = []
for c in buffer:
if ord(c) > 128 and unicodedata.category(c).startswith("C"):
if ord(c) < 128:
s.append(c)
b.append(1)
elif unicodedata.category(c).startswith("C"):
c = r"\u%04x" % ord(c)
s.append(c)
b.append(wlen(c))
b.extend([0] * (len(c) - 1))
s.append(c)
b.append(str_width(c))
b.extend([0] * (len(c) - 1))
else:
s.append(c)
b.append(str_width(c))
return "".join(s), b


Expand Down Expand Up @@ -230,16 +234,59 @@ class Reader:
commands: dict[str, type[Command]] = field(default_factory=make_default_commands)
last_command: type[Command] | None = None
syntax_table: dict[str, int] = field(default_factory=make_default_syntax_table)
msg_at_bottom: bool = True
keymap: tuple[tuple[str, str], ...] = ()
input_trans: input.KeymapTranslator = field(init=False)
input_trans_stack: list[input.KeymapTranslator] = field(default_factory=list)
screen: list[str] = field(default_factory=list)
screeninfo: list[tuple[int, list[int]]] = field(init=False)
cxy: tuple[int, int] = field(init=False)
lxy: tuple[int, int] = field(init=False)
calc_screen: CalcScreen = field(init=False)
scheduled_commands: list[str] = field(default_factory=list)
can_colorize: bool = False

## cached metadata to speed up screen refreshes
@dataclass
class RefreshCache:
in_bracketed_paste: bool = False
screen: list[str] = field(default_factory=list)
screeninfo: list[tuple[int, list[int]]] = field(init=False)
line_end_offsets: list[int] = field(default_factory=list)
pos: int = field(init=False)
cxy: tuple[int, int] = field(init=False)
dimensions: tuple[int, int] = field(init=False)

def update_cache(self,
reader: Reader,
screen: list[str],
screeninfo: list[tuple[int, list[int]]],
) -> None:
self.in_bracketed_paste = reader.in_bracketed_paste
self.screen = screen.copy()
self.screeninfo = screeninfo.copy()
self.pos = reader.pos
self.cxy = reader.cxy
self.dimensions = reader.console.width, reader.console.height

def valid(self, reader: Reader) -> bool:
dimensions = reader.console.width, reader.console.height
dimensions_changed = dimensions != self.dimensions
paste_changed = reader.in_bracketed_paste != self.in_bracketed_paste
return not (dimensions_changed or paste_changed)

def get_cached_location(self, reader: Reader) -> tuple[int, int]:
offset = 0
earliest_common_pos = min(reader.pos, self.pos)
num_common_lines = len(self.line_end_offsets)
while num_common_lines > 0:
offset = self.line_end_offsets[num_common_lines - 1]
if earliest_common_pos > offset:
break
num_common_lines -= 1
else:
offset = 0
return offset, num_common_lines

last_refresh_cache: RefreshCache = field(default_factory=RefreshCache)

def __post_init__(self) -> None:
# Enable the use of `insert` without a `prepare` call - necessary to
Expand All @@ -252,60 +299,69 @@ def __post_init__(self) -> None:
self.screeninfo = [(0, [])]
self.cxy = self.pos2xy()
self.lxy = (self.pos, 0)
self.calc_screen = self.calc_complete_screen
self.can_colorize = can_colorize()

self.last_refresh_cache.screeninfo = self.screeninfo
self.last_refresh_cache.pos = self.pos
self.last_refresh_cache.cxy = self.cxy
self.last_refresh_cache.dimensions = (0, 0)

def collect_keymap(self) -> tuple[tuple[KeySpec, CommandName], ...]:
return default_keymap

def append_to_screen(self) -> list[str]:
new_screen = self.screen.copy() or ['']
def calc_screen(self) -> list[str]:
"""Translate changes in self.buffer into changes in self.console.screen."""
# Since the last call to calc_screen:
# screen and screeninfo may differ due to a completion menu being shown
# pos and cxy may differ due to edits, cursor movements, or completion menus

new_character = self.buffer[-1]
new_character_len = wlen(new_character)
# Lines that are above both the old and new cursor position can't have changed,
# unless the terminal has been resized (which might cause reflowing) or we've
# entered or left paste mode (which changes prompts, causing reflowing).
num_common_lines = 0
offset = 0
if self.last_refresh_cache.valid(self):
offset, num_common_lines = self.last_refresh_cache.get_cached_location(self)

last_line_len = wlen(new_screen[-1])
if last_line_len + new_character_len >= self.console.width: # We need to wrap here
new_screen[-1] += '\\'
self.screeninfo[-1][1].append(1)
new_screen.append(self.buffer[-1])
self.screeninfo.append((0, [new_character_len]))
else:
new_screen[-1] += self.buffer[-1]
self.screeninfo[-1][1].append(new_character_len)
self.cxy = self.pos2xy()
screen = self.last_refresh_cache.screen
del screen[num_common_lines:]

# Reset the function that is used for completing the screen
self.calc_screen = self.calc_complete_screen
return new_screen
screeninfo = self.last_refresh_cache.screeninfo
del screeninfo[num_common_lines:]

last_refresh_line_end_offsets = self.last_refresh_cache.line_end_offsets
del last_refresh_line_end_offsets[num_common_lines:]

def calc_complete_screen(self) -> list[str]:
"""The purpose of this method is to translate changes in
self.buffer into changes in self.screen. Currently it rips
everything down and starts from scratch, which whilst not
especially efficient is certainly simple(r).
"""
lines = self.get_unicode().split("\n")
screen: list[str] = []
screeninfo: list[tuple[int, list[int]]] = []
pos = self.pos
for ln, line in enumerate(lines):
pos -= offset

lines = "".join(self.buffer[offset:]).split("\n")
cursor_found = False
lines_beyond_cursor = 0
for ln, line in enumerate(lines, num_common_lines):
ll = len(line)
if 0 <= pos <= ll:
if self.msg and not self.msg_at_bottom:
for mline in self.msg.split("\n"):
screen.append(mline)
screeninfo.append((0, []))
self.lxy = pos, ln
cursor_found = True
elif cursor_found:
lines_beyond_cursor += 1
if lines_beyond_cursor > self.console.height:
# No need to keep formatting lines.
# The console can't show them.
break
prompt = self.get_prompt(ln, ll >= pos >= 0)
while "\n" in prompt:
pre_prompt, _, prompt = prompt.partition("\n")
last_refresh_line_end_offsets.append(offset)
screen.append(pre_prompt)
screeninfo.append((0, []))
pos -= ll + 1
prompt, lp = self.process_prompt(prompt)
l, l2 = disp_str(line)
wrapcount = (wlen(l) + lp) // self.console.width
if wrapcount == 0:
offset += ll + 1 # Takes all of the line plus the newline
last_refresh_line_end_offsets.append(offset)
screen.append(prompt + l)
screeninfo.append((lp, l2))
else:
Expand All @@ -321,22 +377,27 @@ def calc_complete_screen(self) -> list[str]:
column += character_width
pre = prompt if i == 0 else ""
if len(l) > index_to_wrap_before:
offset += index_to_wrap_before
post = "\\"
after = [1]
else:
offset += index_to_wrap_before + 1 # Takes the newline
post = ""
after = []
last_refresh_line_end_offsets.append(offset)
screen.append(pre + l[:index_to_wrap_before] + post)
screeninfo.append((prelen, l2[:index_to_wrap_before] + after))
l = l[index_to_wrap_before:]
l2 = l2[index_to_wrap_before:]
i += 1
self.screeninfo = screeninfo
self.cxy = self.pos2xy()
if self.msg and self.msg_at_bottom:
if self.msg:
for mline in self.msg.split("\n"):
screen.append(mline)
screeninfo.append((0, []))

self.last_refresh_cache.update_cache(self, screen, screeninfo)
return screen

@staticmethod
Expand Down Expand Up @@ -456,7 +517,7 @@ def get_prompt(self, lineno: int, cursor_on_line: bool) -> str:
'lineno'."""
if self.arg is not None and cursor_on_line:
prompt = f"(arg: {self.arg}) "
elif self.paste_mode:
elif self.paste_mode and not self.in_bracketed_paste:
prompt = "(paste) "
elif "\n" in self.buffer:
if lineno == 0:
Expand All @@ -468,7 +529,7 @@ def get_prompt(self, lineno: int, cursor_on_line: bool) -> str:
else:
prompt = self.ps1

if can_colorize():
if self.can_colorize:
prompt = f"{ANSIColors.BOLD_MAGENTA}{prompt}{ANSIColors.RESET}"
return prompt

Expand Down Expand Up @@ -604,6 +665,9 @@ def update_screen(self) -> None:

def refresh(self) -> None:
"""Recalculate and refresh the screen."""
if self.in_bracketed_paste and self.buffer and not self.buffer[-1] == "\n":
return

# this call sets up self.cxy, so call it first.
self.screen = self.calc_screen()
self.console.refresh(self.screen, self.cxy)
Expand All @@ -627,7 +691,7 @@ def do_cmd(self, cmd: tuple[str, list[str]]) -> None:

self.after_command(command)

if self.dirty and not self.in_bracketed_paste:
if self.dirty:
self.refresh()
else:
self.update_cursor()
Expand Down
4 changes: 4 additions & 0 deletions Lib/_pyrepl/readline.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,10 @@ def do(self) -> None:
r = self.reader # type: ignore[assignment]
r.dirty = True # this is needed to hide the completion menu, if visible

if self.reader.in_bracketed_paste:
r.insert("\n")
return

# if there are already several lines and the cursor
# is not on the last one, always insert a new \n.
text = r.get_unicode()
Expand Down
20 changes: 17 additions & 3 deletions Lib/_pyrepl/unix_console.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ def __init__(

self.pollob = poll()
self.pollob.register(self.input_fd, select.POLLIN)
self.input_buffer = b""
self.input_buffer_pos = 0
curses.setupterm(term or None, self.output_fd)
self.term = term

Expand Down Expand Up @@ -197,6 +199,18 @@ def _my_getstr(cap: str, optional: bool = False) -> bytes | None:
self.event_queue = EventQueue(self.input_fd, self.encoding)
self.cursor_visible = 1

def __read(self, n: int) -> bytes:
if not self.input_buffer or self.input_buffer_pos >= len(self.input_buffer):
self.input_buffer = os.read(self.input_fd, 10000)

ret = self.input_buffer[self.input_buffer_pos : self.input_buffer_pos + n]
self.input_buffer_pos += len(ret)
if self.input_buffer_pos >= len(self.input_buffer):
self.input_buffer = b""
self.input_buffer_pos = 0
return ret


def change_encoding(self, encoding: str) -> None:
"""
Change the encoding used for I/O operations.
Expand Down Expand Up @@ -373,7 +387,7 @@ def get_event(self, block: bool = True) -> Event | None:
while self.event_queue.empty():
while True:
try:
self.push_char(os.read(self.input_fd, 1))
self.push_char(self.__read(1))
except OSError as err:
if err.errno == errno.EINTR:
if not self.event_queue.empty():
Expand Down Expand Up @@ -491,7 +505,7 @@ def getpending(self):
e.raw += e.raw

amount = struct.unpack("i", ioctl(self.input_fd, FIONREAD, b"\0\0\0\0"))[0]
raw = os.read(self.input_fd, amount)
raw = self.__read(amount)
data = str(raw, self.encoding, "replace")
e.data += data
e.raw += raw
Expand All @@ -514,7 +528,7 @@ def getpending(self):
e.raw += e.raw

amount = 10000
raw = os.read(self.input_fd, amount)
raw = self.__read(amount)
data = str(raw, self.encoding, "replace")
e.data += data
e.raw += raw
Expand Down
2 changes: 2 additions & 0 deletions Lib/_pyrepl/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ def str_width(c: str) -> int:


def wlen(s: str) -> int:
if len(s) == 1:
return str_width(s)
length = sum(str_width(i) for i in s)
# remove lengths of any escape sequences
sequence = ANSI_ESCAPE_SEQUENCE.findall(s)
Expand Down
Loading