From 5a8a4059d646fd313e81365ef240562556d8bd3f Mon Sep 17 00:00:00 2001 From: Eliah Kagan Date: Fri, 6 Jun 2025 23:16:45 -0400 Subject: [PATCH] Refactor Git.{AutoInterrupt,CatFileContentStream} nesting This makes `Git.AutoInterrupt` and `Git.CatFileContentStream` transparent aliases to top-level nonpublic `_AutoInterrupt` and `_CatFileContentStream` classes in the `cmd` module. This does not change the "public" interface. It also does not change metadata relevant to documentation: the `__name__` and `__qualname__` attributes are set explicitly to the values they had before when these classes were defined nested, so that Sphinx continues to document them (and to do so in full) in `Git` and as `Git.AutoInterrupt` and `Git.CatFileContentStream`. The purpose of this is to increase readability. The `Git` class is big and complex, with a number of long members and various forms of nesting. Since these two classes can be understood even without reading the code of the `Git` class, moving the definitions out of the `Git` class into top-level nonpublic classes will hopefully increase readability and help with maintenance. --- git/cmd.py | 440 +++++++++++++++++++++++++++-------------------------- 1 file changed, 226 insertions(+), 214 deletions(-) diff --git a/git/cmd.py b/git/cmd.py index 7f46edc8f..a6195880d 100644 --- a/git/cmd.py +++ b/git/cmd.py @@ -308,6 +308,230 @@ def dict_to_slots_and__excluded_are_none(self: object, d: Mapping[str, Any], exc ## -- End Utilities -- @} + +class _AutoInterrupt: + """Process wrapper that terminates the wrapped process on finalization. + + This kills/interrupts the stored process instance once this instance goes out of + scope. It is used to prevent processes piling up in case iterators stop reading. + + All attributes are wired through to the contained process object. + + The wait method is overridden to perform automatic status code checking and possibly + raise. + """ + + __slots__ = ("proc", "args", "status") + + # If this is non-zero it will override any status code during _terminate, used + # to prevent race conditions in testing. + _status_code_if_terminate: int = 0 + + def __init__(self, proc: Union[None, subprocess.Popen], args: Any) -> None: + self.proc = proc + self.args = args + self.status: Union[int, None] = None + + def _terminate(self) -> None: + """Terminate the underlying process.""" + if self.proc is None: + return + + proc = self.proc + self.proc = None + if proc.stdin: + proc.stdin.close() + if proc.stdout: + proc.stdout.close() + if proc.stderr: + proc.stderr.close() + # Did the process finish already so we have a return code? + try: + if proc.poll() is not None: + self.status = self._status_code_if_terminate or proc.poll() + return + except OSError as ex: + _logger.info("Ignored error after process had died: %r", ex) + + # It can be that nothing really exists anymore... + if os is None or getattr(os, "kill", None) is None: + return + + # Try to kill it. + try: + proc.terminate() + status = proc.wait() # Ensure the process goes away. + + self.status = self._status_code_if_terminate or status + except OSError as ex: + _logger.info("Ignored error after process had died: %r", ex) + # END exception handling + + def __del__(self) -> None: + self._terminate() + + def __getattr__(self, attr: str) -> Any: + return getattr(self.proc, attr) + + # TODO: Bad choice to mimic `proc.wait()` but with different args. + def wait(self, stderr: Union[None, str, bytes] = b"") -> int: + """Wait for the process and return its status code. + + :param stderr: + Previously read value of stderr, in case stderr is already closed. + + :warn: + May deadlock if output or error pipes are used and not handled separately. + + :raise git.exc.GitCommandError: + If the return status is not 0. + """ + if stderr is None: + stderr_b = b"" + stderr_b = force_bytes(data=stderr, encoding="utf-8") + status: Union[int, None] + if self.proc is not None: + status = self.proc.wait() + p_stderr = self.proc.stderr + else: # Assume the underlying proc was killed earlier or never existed. + status = self.status + p_stderr = None + + def read_all_from_possibly_closed_stream(stream: Union[IO[bytes], None]) -> bytes: + if stream: + try: + return stderr_b + force_bytes(stream.read()) + except (OSError, ValueError): + return stderr_b or b"" + else: + return stderr_b or b"" + + # END status handling + + if status != 0: + errstr = read_all_from_possibly_closed_stream(p_stderr) + _logger.debug("AutoInterrupt wait stderr: %r" % (errstr,)) + raise GitCommandError(remove_password_if_present(self.args), status, errstr) + return status + + +_AutoInterrupt.__name__ = "AutoInterrupt" +_AutoInterrupt.__qualname__ = "Git.AutoInterrupt" + + +class _CatFileContentStream: + """Object representing a sized read-only stream returning the contents of + an object. + + This behaves like a stream, but counts the data read and simulates an empty stream + once our sized content region is empty. + + If not all data are read to the end of the object's lifetime, we read the rest to + ensure the underlying stream continues to work. + """ + + __slots__ = ("_stream", "_nbr", "_size") + + def __init__(self, size: int, stream: IO[bytes]) -> None: + self._stream = stream + self._size = size + self._nbr = 0 # Number of bytes read. + + # Special case: If the object is empty, has null bytes, get the final + # newline right away. + if size == 0: + stream.read(1) + # END handle empty streams + + def read(self, size: int = -1) -> bytes: + bytes_left = self._size - self._nbr + if bytes_left == 0: + return b"" + if size > -1: + # Ensure we don't try to read past our limit. + size = min(bytes_left, size) + else: + # They try to read all, make sure it's not more than what remains. + size = bytes_left + # END check early depletion + data = self._stream.read(size) + self._nbr += len(data) + + # Check for depletion, read our final byte to make the stream usable by + # others. + if self._size - self._nbr == 0: + self._stream.read(1) # final newline + # END finish reading + return data + + def readline(self, size: int = -1) -> bytes: + if self._nbr == self._size: + return b"" + + # Clamp size to lowest allowed value. + bytes_left = self._size - self._nbr + if size > -1: + size = min(bytes_left, size) + else: + size = bytes_left + # END handle size + + data = self._stream.readline(size) + self._nbr += len(data) + + # Handle final byte. + if self._size - self._nbr == 0: + self._stream.read(1) + # END finish reading + + return data + + def readlines(self, size: int = -1) -> List[bytes]: + if self._nbr == self._size: + return [] + + # Leave all additional logic to our readline method, we just check the size. + out = [] + nbr = 0 + while True: + line = self.readline() + if not line: + break + out.append(line) + if size > -1: + nbr += len(line) + if nbr > size: + break + # END handle size constraint + # END readline loop + return out + + # skipcq: PYL-E0301 + def __iter__(self) -> "Git.CatFileContentStream": + return self + + def __next__(self) -> bytes: + line = self.readline() + if not line: + raise StopIteration + + return line + + next = __next__ + + def __del__(self) -> None: + bytes_left = self._size - self._nbr + if bytes_left: + # Read and discard - seeking is impossible within a stream. + # This includes any terminating newline. + self._stream.read(bytes_left + 1) + # END handle incomplete read + + +_CatFileContentStream.__name__ = "CatFileContentStream" +_CatFileContentStream.__qualname__ = "Git.CatFileContentStream" + + _USE_SHELL_DEFAULT_MESSAGE = ( "Git.USE_SHELL is deprecated, because only its default value of False is safe. " "It will be removed in a future release." @@ -728,221 +952,9 @@ def check_unsafe_options(cls, options: List[str], unsafe_options: List[str]) -> f"{unsafe_option} is not allowed, use `allow_unsafe_options=True` to allow it." ) - class AutoInterrupt: - """Process wrapper that terminates the wrapped process on finalization. - - This kills/interrupts the stored process instance once this instance goes out of - scope. It is used to prevent processes piling up in case iterators stop reading. - - All attributes are wired through to the contained process object. - - The wait method is overridden to perform automatic status code checking and - possibly raise. - """ - - __slots__ = ("proc", "args", "status") - - # If this is non-zero it will override any status code during _terminate, used - # to prevent race conditions in testing. - _status_code_if_terminate: int = 0 - - def __init__(self, proc: Union[None, subprocess.Popen], args: Any) -> None: - self.proc = proc - self.args = args - self.status: Union[int, None] = None - - def _terminate(self) -> None: - """Terminate the underlying process.""" - if self.proc is None: - return - - proc = self.proc - self.proc = None - if proc.stdin: - proc.stdin.close() - if proc.stdout: - proc.stdout.close() - if proc.stderr: - proc.stderr.close() - # Did the process finish already so we have a return code? - try: - if proc.poll() is not None: - self.status = self._status_code_if_terminate or proc.poll() - return - except OSError as ex: - _logger.info("Ignored error after process had died: %r", ex) - - # It can be that nothing really exists anymore... - if os is None or getattr(os, "kill", None) is None: - return - - # Try to kill it. - try: - proc.terminate() - status = proc.wait() # Ensure the process goes away. - - self.status = self._status_code_if_terminate or status - except OSError as ex: - _logger.info("Ignored error after process had died: %r", ex) - # END exception handling - - def __del__(self) -> None: - self._terminate() - - def __getattr__(self, attr: str) -> Any: - return getattr(self.proc, attr) - - # TODO: Bad choice to mimic `proc.wait()` but with different args. - def wait(self, stderr: Union[None, str, bytes] = b"") -> int: - """Wait for the process and return its status code. - - :param stderr: - Previously read value of stderr, in case stderr is already closed. - - :warn: - May deadlock if output or error pipes are used and not handled - separately. - - :raise git.exc.GitCommandError: - If the return status is not 0. - """ - if stderr is None: - stderr_b = b"" - stderr_b = force_bytes(data=stderr, encoding="utf-8") - status: Union[int, None] - if self.proc is not None: - status = self.proc.wait() - p_stderr = self.proc.stderr - else: # Assume the underlying proc was killed earlier or never existed. - status = self.status - p_stderr = None - - def read_all_from_possibly_closed_stream(stream: Union[IO[bytes], None]) -> bytes: - if stream: - try: - return stderr_b + force_bytes(stream.read()) - except (OSError, ValueError): - return stderr_b or b"" - else: - return stderr_b or b"" - - # END status handling - - if status != 0: - errstr = read_all_from_possibly_closed_stream(p_stderr) - _logger.debug("AutoInterrupt wait stderr: %r" % (errstr,)) - raise GitCommandError(remove_password_if_present(self.args), status, errstr) - return status - - # END auto interrupt - - class CatFileContentStream: - """Object representing a sized read-only stream returning the contents of - an object. - - This behaves like a stream, but counts the data read and simulates an empty - stream once our sized content region is empty. - - If not all data are read to the end of the object's lifetime, we read the - rest to ensure the underlying stream continues to work. - """ - - __slots__ = ("_stream", "_nbr", "_size") - - def __init__(self, size: int, stream: IO[bytes]) -> None: - self._stream = stream - self._size = size - self._nbr = 0 # Number of bytes read. - - # Special case: If the object is empty, has null bytes, get the final - # newline right away. - if size == 0: - stream.read(1) - # END handle empty streams - - def read(self, size: int = -1) -> bytes: - bytes_left = self._size - self._nbr - if bytes_left == 0: - return b"" - if size > -1: - # Ensure we don't try to read past our limit. - size = min(bytes_left, size) - else: - # They try to read all, make sure it's not more than what remains. - size = bytes_left - # END check early depletion - data = self._stream.read(size) - self._nbr += len(data) - - # Check for depletion, read our final byte to make the stream usable by - # others. - if self._size - self._nbr == 0: - self._stream.read(1) # final newline - # END finish reading - return data - - def readline(self, size: int = -1) -> bytes: - if self._nbr == self._size: - return b"" - - # Clamp size to lowest allowed value. - bytes_left = self._size - self._nbr - if size > -1: - size = min(bytes_left, size) - else: - size = bytes_left - # END handle size - - data = self._stream.readline(size) - self._nbr += len(data) - - # Handle final byte. - if self._size - self._nbr == 0: - self._stream.read(1) - # END finish reading - - return data - - def readlines(self, size: int = -1) -> List[bytes]: - if self._nbr == self._size: - return [] - - # Leave all additional logic to our readline method, we just check the size. - out = [] - nbr = 0 - while True: - line = self.readline() - if not line: - break - out.append(line) - if size > -1: - nbr += len(line) - if nbr > size: - break - # END handle size constraint - # END readline loop - return out - - # skipcq: PYL-E0301 - def __iter__(self) -> "Git.CatFileContentStream": - return self - - def __next__(self) -> bytes: - line = self.readline() - if not line: - raise StopIteration - - return line - - next = __next__ + AutoInterrupt = _AutoInterrupt - def __del__(self) -> None: - bytes_left = self._size - self._nbr - if bytes_left: - # Read and discard - seeking is impossible within a stream. - # This includes any terminating newline. - self._stream.read(bytes_left + 1) - # END handle incomplete read + CatFileContentStream = _CatFileContentStream def __init__(self, working_dir: Union[None, PathLike] = None) -> None: """Initialize this instance with: