Skip to content

Add test.support.interpreters at 3.13.2 #5684

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 258 additions & 0 deletions Lib/test/support/interpreters/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
"""Subinterpreters High Level Module."""

import threading
import weakref
import _interpreters

# aliases:
from _interpreters import (
InterpreterError, InterpreterNotFoundError, NotShareableError,
is_shareable,
)


__all__ = [
'get_current', 'get_main', 'create', 'list_all', 'is_shareable',
'Interpreter',
'InterpreterError', 'InterpreterNotFoundError', 'ExecutionFailed',
'NotShareableError',
'create_queue', 'Queue', 'QueueEmpty', 'QueueFull',
]


_queuemod = None

def __getattr__(name):
if name in ('Queue', 'QueueEmpty', 'QueueFull', 'create_queue'):
global create_queue, Queue, QueueEmpty, QueueFull
ns = globals()
from .queues import (
create as create_queue,
Queue, QueueEmpty, QueueFull,
)
return ns[name]
else:
raise AttributeError(name)


_EXEC_FAILURE_STR = """
{superstr}

Uncaught in the interpreter:

{formatted}
""".strip()

class ExecutionFailed(InterpreterError):
"""An unhandled exception happened during execution.

This is raised from Interpreter.exec() and Interpreter.call().
"""

def __init__(self, excinfo):
msg = excinfo.formatted
if not msg:
if excinfo.type and excinfo.msg:
msg = f'{excinfo.type.__name__}: {excinfo.msg}'
else:
msg = excinfo.type.__name__ or excinfo.msg
super().__init__(msg)
self.excinfo = excinfo

def __str__(self):
try:
formatted = self.excinfo.errdisplay
except Exception:
return super().__str__()
else:
return _EXEC_FAILURE_STR.format(
superstr=super().__str__(),
formatted=formatted,
)


def create():
"""Return a new (idle) Python interpreter."""
id = _interpreters.create(reqrefs=True)
return Interpreter(id, _ownsref=True)


def list_all():
"""Return all existing interpreters."""
return [Interpreter(id, _whence=whence)
for id, whence in _interpreters.list_all(require_ready=True)]


def get_current():
"""Return the currently running interpreter."""
id, whence = _interpreters.get_current()
return Interpreter(id, _whence=whence)


def get_main():
"""Return the main interpreter."""
id, whence = _interpreters.get_main()
assert whence == _interpreters.WHENCE_RUNTIME, repr(whence)
return Interpreter(id, _whence=whence)


_known = weakref.WeakValueDictionary()

class Interpreter:
"""A single Python interpreter.

Attributes:

"id" - the unique process-global ID number for the interpreter
"whence" - indicates where the interpreter was created

If the interpreter wasn't created by this module
then any method that modifies the interpreter will fail,
i.e. .close(), .prepare_main(), .exec(), and .call()
"""

_WHENCE_TO_STR = {
_interpreters.WHENCE_UNKNOWN: 'unknown',
_interpreters.WHENCE_RUNTIME: 'runtime init',
_interpreters.WHENCE_LEGACY_CAPI: 'legacy C-API',
_interpreters.WHENCE_CAPI: 'C-API',
_interpreters.WHENCE_XI: 'cross-interpreter C-API',
_interpreters.WHENCE_STDLIB: '_interpreters module',
}

def __new__(cls, id, /, _whence=None, _ownsref=None):
# There is only one instance for any given ID.
if not isinstance(id, int):
raise TypeError(f'id must be an int, got {id!r}')
id = int(id)
if _whence is None:
if _ownsref:
_whence = _interpreters.WHENCE_STDLIB
else:
_whence = _interpreters.whence(id)
assert _whence in cls._WHENCE_TO_STR, repr(_whence)
if _ownsref is None:
_ownsref = (_whence == _interpreters.WHENCE_STDLIB)
try:
self = _known[id]
assert hasattr(self, '_ownsref')
except KeyError:
self = super().__new__(cls)
_known[id] = self
self._id = id
self._whence = _whence
self._ownsref = _ownsref
if _ownsref:
# This may raise InterpreterNotFoundError:
_interpreters.incref(id)
return self

def __repr__(self):
return f'{type(self).__name__}({self.id})'

def __hash__(self):
return hash(self._id)

def __del__(self):
self._decref()

# for pickling:
def __getnewargs__(self):
return (self._id,)

# for pickling:
def __getstate__(self):
return None

def _decref(self):
if not self._ownsref:
return
self._ownsref = False
try:
_interpreters.decref(self._id)
except InterpreterNotFoundError:
pass

@property
def id(self):
return self._id

@property
def whence(self):
return self._WHENCE_TO_STR[self._whence]

def is_running(self):
"""Return whether or not the identified interpreter is running."""
return _interpreters.is_running(self._id)

# Everything past here is available only to interpreters created by
# interpreters.create().

def close(self):
"""Finalize and destroy the interpreter.

Attempting to destroy the current interpreter results
in an InterpreterError.
"""
return _interpreters.destroy(self._id, restrict=True)

def prepare_main(self, ns=None, /, **kwargs):
"""Bind the given values into the interpreter's __main__.

The values must be shareable.
"""
ns = dict(ns, **kwargs) if ns is not None else kwargs
_interpreters.set___main___attrs(self._id, ns, restrict=True)

def exec(self, code, /):
"""Run the given source code in the interpreter.

This is essentially the same as calling the builtin "exec"
with this interpreter, using the __dict__ of its __main__
module as both globals and locals.

There is no return value.

If the code raises an unhandled exception then an ExecutionFailed
exception is raised, which summarizes the unhandled exception.
The actual exception is discarded because objects cannot be
shared between interpreters.

This blocks the current Python thread until done. During
that time, the previous interpreter is allowed to run
in other threads.
"""
excinfo = _interpreters.exec(self._id, code, restrict=True)
if excinfo is not None:
raise ExecutionFailed(excinfo)

def call(self, callable, /):
"""Call the object in the interpreter with given args/kwargs.

Only functions that take no arguments and have no closure
are supported.

The return value is discarded.

If the callable raises an exception then the error display
(including full traceback) is send back between the interpreters
and an ExecutionFailed exception is raised, much like what
happens with Interpreter.exec().
"""
# XXX Support args and kwargs.
# XXX Support arbitrary callables.
# XXX Support returning the return value (e.g. via pickle).
excinfo = _interpreters.call(self._id, callable, restrict=True)
if excinfo is not None:
raise ExecutionFailed(excinfo)

def call_in_thread(self, callable, /):
"""Return a new thread that calls the object in the interpreter.

The return value and any raised exception are discarded.
"""
def task():
self.call(callable)
t = threading.Thread(target=task)
t.start()
return t
102 changes: 102 additions & 0 deletions Lib/test/support/interpreters/_crossinterp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
"""Common code between queues and channels."""


class ItemInterpreterDestroyed(Exception):
"""Raised when trying to get an item whose interpreter was destroyed."""


class classonly:
"""A non-data descriptor that makes a value only visible on the class.

This is like the "classmethod" builtin, but does not show up on
instances of the class. It may be used as a decorator.
"""

def __init__(self, value):
self.value = value
self.getter = classmethod(value).__get__
self.name = None

def __set_name__(self, cls, name):
if self.name is not None:
raise TypeError('already used')
self.name = name

def __get__(self, obj, cls):
if obj is not None:
raise AttributeError(self.name)
# called on the class
return self.getter(None, cls)


class UnboundItem:
"""Represents a cross-interpreter item no longer bound to an interpreter.

An item is unbound when the interpreter that added it to the
cross-interpreter container is destroyed.
"""

__slots__ = ()

@classonly
def singleton(cls, kind, module, name='UNBOUND'):
doc = cls.__doc__.replace('cross-interpreter container', kind)
doc = doc.replace('cross-interpreter', kind)
subclass = type(
f'Unbound{kind.capitalize()}Item',
(cls,),
dict(
_MODULE=module,
_NAME=name,
__doc__=doc,
),
)
return object.__new__(subclass)

_MODULE = __name__
_NAME = 'UNBOUND'

def __new__(cls):
raise Exception(f'use {cls._MODULE}.{cls._NAME}')

def __repr__(self):
return f'{self._MODULE}.{self._NAME}'
# return f'interpreters.queues.UNBOUND'


UNBOUND = object.__new__(UnboundItem)
UNBOUND_ERROR = object()
UNBOUND_REMOVE = object()

_UNBOUND_CONSTANT_TO_FLAG = {
UNBOUND_REMOVE: 1,
UNBOUND_ERROR: 2,
UNBOUND: 3,
}
_UNBOUND_FLAG_TO_CONSTANT = {v: k
for k, v in _UNBOUND_CONSTANT_TO_FLAG.items()}


def serialize_unbound(unbound):
op = unbound
try:
flag = _UNBOUND_CONSTANT_TO_FLAG[op]
except KeyError:
raise NotImplementedError(f'unsupported unbound replacement op {op!r}')
return flag,


def resolve_unbound(flag, exctype_destroyed):
try:
op = _UNBOUND_FLAG_TO_CONSTANT[flag]
except KeyError:
raise NotImplementedError(f'unsupported unbound replacement op {flag!r}')
if op is UNBOUND_REMOVE:
# "remove" not possible here
raise NotImplementedError
elif op is UNBOUND_ERROR:
raise exctype_destroyed("item's original interpreter destroyed")
elif op is UNBOUND:
return UNBOUND
else:
raise NotImplementedError(repr(op))
Loading
Loading