From 1588665407f95aefcf33155546fbbe3c5c161f5d Mon Sep 17 00:00:00 2001 From: Ashwin Naren Date: Thu, 10 Apr 2025 09:52:59 -0700 Subject: [PATCH] Add test.support.interpreters at 3.13.2 --- Lib/test/support/interpreters/__init__.py | 258 +++++++++++++++ Lib/test/support/interpreters/_crossinterp.py | 102 ++++++ Lib/test/support/interpreters/channels.py | 257 ++++++++++++++ Lib/test/support/interpreters/queues.py | 313 ++++++++++++++++++ 4 files changed, 930 insertions(+) create mode 100644 Lib/test/support/interpreters/__init__.py create mode 100644 Lib/test/support/interpreters/_crossinterp.py create mode 100644 Lib/test/support/interpreters/channels.py create mode 100644 Lib/test/support/interpreters/queues.py diff --git a/Lib/test/support/interpreters/__init__.py b/Lib/test/support/interpreters/__init__.py new file mode 100644 index 0000000000..e067f25936 --- /dev/null +++ b/Lib/test/support/interpreters/__init__.py @@ -0,0 +1,258 @@ +"""Subinterpreters High Level Module.""" + +import threading +import weakref +import _interpreters + +# aliases: +from _interpreters import ( + InterpreterError, InterpreterNotFoundError, NotShareableError, + is_shareable, +) + + +__all__ = [ + 'get_current', 'get_main', 'create', 'list_all', 'is_shareable', + 'Interpreter', + 'InterpreterError', 'InterpreterNotFoundError', 'ExecutionFailed', + 'NotShareableError', + 'create_queue', 'Queue', 'QueueEmpty', 'QueueFull', +] + + +_queuemod = None + +def __getattr__(name): + if name in ('Queue', 'QueueEmpty', 'QueueFull', 'create_queue'): + global create_queue, Queue, QueueEmpty, QueueFull + ns = globals() + from .queues import ( + create as create_queue, + Queue, QueueEmpty, QueueFull, + ) + return ns[name] + else: + raise AttributeError(name) + + +_EXEC_FAILURE_STR = """ +{superstr} + +Uncaught in the interpreter: + +{formatted} +""".strip() + +class ExecutionFailed(InterpreterError): + """An unhandled exception happened during execution. + + This is raised from Interpreter.exec() and Interpreter.call(). + """ + + def __init__(self, excinfo): + msg = excinfo.formatted + if not msg: + if excinfo.type and excinfo.msg: + msg = f'{excinfo.type.__name__}: {excinfo.msg}' + else: + msg = excinfo.type.__name__ or excinfo.msg + super().__init__(msg) + self.excinfo = excinfo + + def __str__(self): + try: + formatted = self.excinfo.errdisplay + except Exception: + return super().__str__() + else: + return _EXEC_FAILURE_STR.format( + superstr=super().__str__(), + formatted=formatted, + ) + + +def create(): + """Return a new (idle) Python interpreter.""" + id = _interpreters.create(reqrefs=True) + return Interpreter(id, _ownsref=True) + + +def list_all(): + """Return all existing interpreters.""" + return [Interpreter(id, _whence=whence) + for id, whence in _interpreters.list_all(require_ready=True)] + + +def get_current(): + """Return the currently running interpreter.""" + id, whence = _interpreters.get_current() + return Interpreter(id, _whence=whence) + + +def get_main(): + """Return the main interpreter.""" + id, whence = _interpreters.get_main() + assert whence == _interpreters.WHENCE_RUNTIME, repr(whence) + return Interpreter(id, _whence=whence) + + +_known = weakref.WeakValueDictionary() + +class Interpreter: + """A single Python interpreter. + + Attributes: + + "id" - the unique process-global ID number for the interpreter + "whence" - indicates where the interpreter was created + + If the interpreter wasn't created by this module + then any method that modifies the interpreter will fail, + i.e. .close(), .prepare_main(), .exec(), and .call() + """ + + _WHENCE_TO_STR = { + _interpreters.WHENCE_UNKNOWN: 'unknown', + _interpreters.WHENCE_RUNTIME: 'runtime init', + _interpreters.WHENCE_LEGACY_CAPI: 'legacy C-API', + _interpreters.WHENCE_CAPI: 'C-API', + _interpreters.WHENCE_XI: 'cross-interpreter C-API', + _interpreters.WHENCE_STDLIB: '_interpreters module', + } + + def __new__(cls, id, /, _whence=None, _ownsref=None): + # There is only one instance for any given ID. + if not isinstance(id, int): + raise TypeError(f'id must be an int, got {id!r}') + id = int(id) + if _whence is None: + if _ownsref: + _whence = _interpreters.WHENCE_STDLIB + else: + _whence = _interpreters.whence(id) + assert _whence in cls._WHENCE_TO_STR, repr(_whence) + if _ownsref is None: + _ownsref = (_whence == _interpreters.WHENCE_STDLIB) + try: + self = _known[id] + assert hasattr(self, '_ownsref') + except KeyError: + self = super().__new__(cls) + _known[id] = self + self._id = id + self._whence = _whence + self._ownsref = _ownsref + if _ownsref: + # This may raise InterpreterNotFoundError: + _interpreters.incref(id) + return self + + def __repr__(self): + return f'{type(self).__name__}({self.id})' + + def __hash__(self): + return hash(self._id) + + def __del__(self): + self._decref() + + # for pickling: + def __getnewargs__(self): + return (self._id,) + + # for pickling: + def __getstate__(self): + return None + + def _decref(self): + if not self._ownsref: + return + self._ownsref = False + try: + _interpreters.decref(self._id) + except InterpreterNotFoundError: + pass + + @property + def id(self): + return self._id + + @property + def whence(self): + return self._WHENCE_TO_STR[self._whence] + + def is_running(self): + """Return whether or not the identified interpreter is running.""" + return _interpreters.is_running(self._id) + + # Everything past here is available only to interpreters created by + # interpreters.create(). + + def close(self): + """Finalize and destroy the interpreter. + + Attempting to destroy the current interpreter results + in an InterpreterError. + """ + return _interpreters.destroy(self._id, restrict=True) + + def prepare_main(self, ns=None, /, **kwargs): + """Bind the given values into the interpreter's __main__. + + The values must be shareable. + """ + ns = dict(ns, **kwargs) if ns is not None else kwargs + _interpreters.set___main___attrs(self._id, ns, restrict=True) + + def exec(self, code, /): + """Run the given source code in the interpreter. + + This is essentially the same as calling the builtin "exec" + with this interpreter, using the __dict__ of its __main__ + module as both globals and locals. + + There is no return value. + + If the code raises an unhandled exception then an ExecutionFailed + exception is raised, which summarizes the unhandled exception. + The actual exception is discarded because objects cannot be + shared between interpreters. + + This blocks the current Python thread until done. During + that time, the previous interpreter is allowed to run + in other threads. + """ + excinfo = _interpreters.exec(self._id, code, restrict=True) + if excinfo is not None: + raise ExecutionFailed(excinfo) + + def call(self, callable, /): + """Call the object in the interpreter with given args/kwargs. + + Only functions that take no arguments and have no closure + are supported. + + The return value is discarded. + + If the callable raises an exception then the error display + (including full traceback) is send back between the interpreters + and an ExecutionFailed exception is raised, much like what + happens with Interpreter.exec(). + """ + # XXX Support args and kwargs. + # XXX Support arbitrary callables. + # XXX Support returning the return value (e.g. via pickle). + excinfo = _interpreters.call(self._id, callable, restrict=True) + if excinfo is not None: + raise ExecutionFailed(excinfo) + + def call_in_thread(self, callable, /): + """Return a new thread that calls the object in the interpreter. + + The return value and any raised exception are discarded. + """ + def task(): + self.call(callable) + t = threading.Thread(target=task) + t.start() + return t diff --git a/Lib/test/support/interpreters/_crossinterp.py b/Lib/test/support/interpreters/_crossinterp.py new file mode 100644 index 0000000000..544e197ba4 --- /dev/null +++ b/Lib/test/support/interpreters/_crossinterp.py @@ -0,0 +1,102 @@ +"""Common code between queues and channels.""" + + +class ItemInterpreterDestroyed(Exception): + """Raised when trying to get an item whose interpreter was destroyed.""" + + +class classonly: + """A non-data descriptor that makes a value only visible on the class. + + This is like the "classmethod" builtin, but does not show up on + instances of the class. It may be used as a decorator. + """ + + def __init__(self, value): + self.value = value + self.getter = classmethod(value).__get__ + self.name = None + + def __set_name__(self, cls, name): + if self.name is not None: + raise TypeError('already used') + self.name = name + + def __get__(self, obj, cls): + if obj is not None: + raise AttributeError(self.name) + # called on the class + return self.getter(None, cls) + + +class UnboundItem: + """Represents a cross-interpreter item no longer bound to an interpreter. + + An item is unbound when the interpreter that added it to the + cross-interpreter container is destroyed. + """ + + __slots__ = () + + @classonly + def singleton(cls, kind, module, name='UNBOUND'): + doc = cls.__doc__.replace('cross-interpreter container', kind) + doc = doc.replace('cross-interpreter', kind) + subclass = type( + f'Unbound{kind.capitalize()}Item', + (cls,), + dict( + _MODULE=module, + _NAME=name, + __doc__=doc, + ), + ) + return object.__new__(subclass) + + _MODULE = __name__ + _NAME = 'UNBOUND' + + def __new__(cls): + raise Exception(f'use {cls._MODULE}.{cls._NAME}') + + def __repr__(self): + return f'{self._MODULE}.{self._NAME}' +# return f'interpreters.queues.UNBOUND' + + +UNBOUND = object.__new__(UnboundItem) +UNBOUND_ERROR = object() +UNBOUND_REMOVE = object() + +_UNBOUND_CONSTANT_TO_FLAG = { + UNBOUND_REMOVE: 1, + UNBOUND_ERROR: 2, + UNBOUND: 3, +} +_UNBOUND_FLAG_TO_CONSTANT = {v: k + for k, v in _UNBOUND_CONSTANT_TO_FLAG.items()} + + +def serialize_unbound(unbound): + op = unbound + try: + flag = _UNBOUND_CONSTANT_TO_FLAG[op] + except KeyError: + raise NotImplementedError(f'unsupported unbound replacement op {op!r}') + return flag, + + +def resolve_unbound(flag, exctype_destroyed): + try: + op = _UNBOUND_FLAG_TO_CONSTANT[flag] + except KeyError: + raise NotImplementedError(f'unsupported unbound replacement op {flag!r}') + if op is UNBOUND_REMOVE: + # "remove" not possible here + raise NotImplementedError + elif op is UNBOUND_ERROR: + raise exctype_destroyed("item's original interpreter destroyed") + elif op is UNBOUND: + return UNBOUND + else: + raise NotImplementedError(repr(op)) diff --git a/Lib/test/support/interpreters/channels.py b/Lib/test/support/interpreters/channels.py new file mode 100644 index 0000000000..d2bd93d77f --- /dev/null +++ b/Lib/test/support/interpreters/channels.py @@ -0,0 +1,257 @@ +"""Cross-interpreter Channels High Level Module.""" + +import time +import _interpchannels as _channels +from . import _crossinterp + +# aliases: +from _interpchannels import ( + ChannelError, ChannelNotFoundError, ChannelClosedError, + ChannelEmptyError, ChannelNotEmptyError, +) +from ._crossinterp import ( + UNBOUND_ERROR, UNBOUND_REMOVE, +) + + +__all__ = [ + 'UNBOUND', 'UNBOUND_ERROR', 'UNBOUND_REMOVE', + 'create', 'list_all', + 'SendChannel', 'RecvChannel', + 'ChannelError', 'ChannelNotFoundError', 'ChannelEmptyError', + 'ItemInterpreterDestroyed', +] + + +class ItemInterpreterDestroyed(ChannelError, + _crossinterp.ItemInterpreterDestroyed): + """Raised from get() and get_nowait().""" + + +UNBOUND = _crossinterp.UnboundItem.singleton('queue', __name__) + + +def _serialize_unbound(unbound): + if unbound is UNBOUND: + unbound = _crossinterp.UNBOUND + return _crossinterp.serialize_unbound(unbound) + + +def _resolve_unbound(flag): + resolved = _crossinterp.resolve_unbound(flag, ItemInterpreterDestroyed) + if resolved is _crossinterp.UNBOUND: + resolved = UNBOUND + return resolved + + +def create(*, unbounditems=UNBOUND): + """Return (recv, send) for a new cross-interpreter channel. + + The channel may be used to pass data safely between interpreters. + + "unbounditems" sets the default for the send end of the channel. + See SendChannel.send() for supported values. The default value + is UNBOUND, which replaces the unbound item when received. + """ + unbound = _serialize_unbound(unbounditems) + unboundop, = unbound + cid = _channels.create(unboundop) + recv, send = RecvChannel(cid), SendChannel(cid, _unbound=unbound) + return recv, send + + +def list_all(): + """Return a list of (recv, send) for all open channels.""" + return [(RecvChannel(cid), SendChannel(cid, _unbound=unbound)) + for cid, unbound in _channels.list_all()] + + +class _ChannelEnd: + """The base class for RecvChannel and SendChannel.""" + + _end = None + + def __new__(cls, cid): + self = super().__new__(cls) + if self._end == 'send': + cid = _channels._channel_id(cid, send=True, force=True) + elif self._end == 'recv': + cid = _channels._channel_id(cid, recv=True, force=True) + else: + raise NotImplementedError(self._end) + self._id = cid + return self + + def __repr__(self): + return f'{type(self).__name__}(id={int(self._id)})' + + def __hash__(self): + return hash(self._id) + + def __eq__(self, other): + if isinstance(self, RecvChannel): + if not isinstance(other, RecvChannel): + return NotImplemented + elif not isinstance(other, SendChannel): + return NotImplemented + return other._id == self._id + + # for pickling: + def __getnewargs__(self): + return (int(self._id),) + + # for pickling: + def __getstate__(self): + return None + + @property + def id(self): + return self._id + + @property + def _info(self): + return _channels.get_info(self._id) + + @property + def is_closed(self): + return self._info.closed + + +_NOT_SET = object() + + +class RecvChannel(_ChannelEnd): + """The receiving end of a cross-interpreter channel.""" + + _end = 'recv' + + def recv(self, timeout=None, *, + _sentinel=object(), + _delay=10 / 1000, # 10 milliseconds + ): + """Return the next object from the channel. + + This blocks until an object has been sent, if none have been + sent already. + """ + if timeout is not None: + timeout = int(timeout) + if timeout < 0: + raise ValueError(f'timeout value must be non-negative') + end = time.time() + timeout + obj, unboundop = _channels.recv(self._id, _sentinel) + while obj is _sentinel: + time.sleep(_delay) + if timeout is not None and time.time() >= end: + raise TimeoutError + obj, unboundop = _channels.recv(self._id, _sentinel) + if unboundop is not None: + assert obj is None, repr(obj) + return _resolve_unbound(unboundop) + return obj + + def recv_nowait(self, default=_NOT_SET): + """Return the next object from the channel. + + If none have been sent then return the default if one + is provided or fail with ChannelEmptyError. Otherwise this + is the same as recv(). + """ + if default is _NOT_SET: + obj, unboundop = _channels.recv(self._id) + else: + obj, unboundop = _channels.recv(self._id, default) + if unboundop is not None: + assert obj is None, repr(obj) + return _resolve_unbound(unboundop) + return obj + + def close(self): + _channels.close(self._id, recv=True) + + +class SendChannel(_ChannelEnd): + """The sending end of a cross-interpreter channel.""" + + _end = 'send' + + def __new__(cls, cid, *, _unbound=None): + if _unbound is None: + try: + op = _channels.get_channel_defaults(cid) + _unbound = (op,) + except ChannelNotFoundError: + _unbound = _serialize_unbound(UNBOUND) + self = super().__new__(cls, cid) + self._unbound = _unbound + return self + + @property + def is_closed(self): + info = self._info + return info.closed or info.closing + + def send(self, obj, timeout=None, *, + unbound=None, + ): + """Send the object (i.e. its data) to the channel's receiving end. + + This blocks until the object is received. + """ + if unbound is None: + unboundop, = self._unbound + else: + unboundop, = _serialize_unbound(unbound) + _channels.send(self._id, obj, unboundop, timeout=timeout, blocking=True) + + def send_nowait(self, obj, *, + unbound=None, + ): + """Send the object to the channel's receiving end. + + If the object is immediately received then return True + (else False). Otherwise this is the same as send(). + """ + if unbound is None: + unboundop, = self._unbound + else: + unboundop, = _serialize_unbound(unbound) + # XXX Note that at the moment channel_send() only ever returns + # None. This should be fixed when channel_send_wait() is added. + # See bpo-32604 and gh-19829. + return _channels.send(self._id, obj, unboundop, blocking=False) + + def send_buffer(self, obj, timeout=None, *, + unbound=None, + ): + """Send the object's buffer to the channel's receiving end. + + This blocks until the object is received. + """ + if unbound is None: + unboundop, = self._unbound + else: + unboundop, = _serialize_unbound(unbound) + _channels.send_buffer(self._id, obj, unboundop, + timeout=timeout, blocking=True) + + def send_buffer_nowait(self, obj, *, + unbound=None, + ): + """Send the object's buffer to the channel's receiving end. + + If the object is immediately received then return True + (else False). Otherwise this is the same as send(). + """ + if unbound is None: + unboundop, = self._unbound + else: + unboundop, = _serialize_unbound(unbound) + return _channels.send_buffer(self._id, obj, unboundop, blocking=False) + + def close(self): + _channels.close(self._id, send=True) + + +# XXX This is causing leaks (gh-110318): +_channels._register_end_types(SendChannel, RecvChannel) diff --git a/Lib/test/support/interpreters/queues.py b/Lib/test/support/interpreters/queues.py new file mode 100644 index 0000000000..deb8e8613a --- /dev/null +++ b/Lib/test/support/interpreters/queues.py @@ -0,0 +1,313 @@ +"""Cross-interpreter Queues High Level Module.""" + +import pickle +import queue +import time +import weakref +import _interpqueues as _queues +from . import _crossinterp + +# aliases: +from _interpqueues import ( + QueueError, QueueNotFoundError, +) +from ._crossinterp import ( + UNBOUND_ERROR, UNBOUND_REMOVE, +) + +__all__ = [ + 'UNBOUND', 'UNBOUND_ERROR', 'UNBOUND_REMOVE', + 'create', 'list_all', + 'Queue', + 'QueueError', 'QueueNotFoundError', 'QueueEmpty', 'QueueFull', + 'ItemInterpreterDestroyed', +] + + +class QueueEmpty(QueueError, queue.Empty): + """Raised from get_nowait() when the queue is empty. + + It is also raised from get() if it times out. + """ + + +class QueueFull(QueueError, queue.Full): + """Raised from put_nowait() when the queue is full. + + It is also raised from put() if it times out. + """ + + +class ItemInterpreterDestroyed(QueueError, + _crossinterp.ItemInterpreterDestroyed): + """Raised from get() and get_nowait().""" + + +_SHARED_ONLY = 0 +_PICKLED = 1 + + +UNBOUND = _crossinterp.UnboundItem.singleton('queue', __name__) + + +def _serialize_unbound(unbound): + if unbound is UNBOUND: + unbound = _crossinterp.UNBOUND + return _crossinterp.serialize_unbound(unbound) + + +def _resolve_unbound(flag): + resolved = _crossinterp.resolve_unbound(flag, ItemInterpreterDestroyed) + if resolved is _crossinterp.UNBOUND: + resolved = UNBOUND + return resolved + + +def create(maxsize=0, *, syncobj=False, unbounditems=UNBOUND): + """Return a new cross-interpreter queue. + + The queue may be used to pass data safely between interpreters. + + "syncobj" sets the default for Queue.put() + and Queue.put_nowait(). + + "unbounditems" likewise sets the default. See Queue.put() for + supported values. The default value is UNBOUND, which replaces + the unbound item. + """ + fmt = _SHARED_ONLY if syncobj else _PICKLED + unbound = _serialize_unbound(unbounditems) + unboundop, = unbound + qid = _queues.create(maxsize, fmt, unboundop) + return Queue(qid, _fmt=fmt, _unbound=unbound) + + +def list_all(): + """Return a list of all open queues.""" + return [Queue(qid, _fmt=fmt, _unbound=(unboundop,)) + for qid, fmt, unboundop in _queues.list_all()] + + +_known_queues = weakref.WeakValueDictionary() + +class Queue: + """A cross-interpreter queue.""" + + def __new__(cls, id, /, *, _fmt=None, _unbound=None): + # There is only one instance for any given ID. + if isinstance(id, int): + id = int(id) + else: + raise TypeError(f'id must be an int, got {id!r}') + if _fmt is None: + if _unbound is None: + _fmt, op = _queues.get_queue_defaults(id) + _unbound = (op,) + else: + _fmt, _ = _queues.get_queue_defaults(id) + elif _unbound is None: + _, op = _queues.get_queue_defaults(id) + _unbound = (op,) + try: + self = _known_queues[id] + except KeyError: + self = super().__new__(cls) + self._id = id + self._fmt = _fmt + self._unbound = _unbound + _known_queues[id] = self + _queues.bind(id) + return self + + def __del__(self): + try: + _queues.release(self._id) + except QueueNotFoundError: + pass + try: + del _known_queues[self._id] + except KeyError: + pass + + def __repr__(self): + return f'{type(self).__name__}({self.id})' + + def __hash__(self): + return hash(self._id) + + # for pickling: + def __getnewargs__(self): + return (self._id,) + + # for pickling: + def __getstate__(self): + return None + + @property + def id(self): + return self._id + + @property + def maxsize(self): + try: + return self._maxsize + except AttributeError: + self._maxsize = _queues.get_maxsize(self._id) + return self._maxsize + + def empty(self): + return self.qsize() == 0 + + def full(self): + return _queues.is_full(self._id) + + def qsize(self): + return _queues.get_count(self._id) + + def put(self, obj, timeout=None, *, + syncobj=None, + unbound=None, + _delay=10 / 1000, # 10 milliseconds + ): + """Add the object to the queue. + + This blocks while the queue is full. + + If "syncobj" is None (the default) then it uses the + queue's default, set with create_queue(). + + If "syncobj" is false then all objects are supported, + at the expense of worse performance. + + If "syncobj" is true then the object must be "shareable". + Examples of "shareable" objects include the builtin singletons, + str, and memoryview. One benefit is that such objects are + passed through the queue efficiently. + + The key difference, though, is conceptual: the corresponding + object returned from Queue.get() will be strictly equivalent + to the given obj. In other words, the two objects will be + effectively indistinguishable from each other, even if the + object is mutable. The received object may actually be the + same object, or a copy (immutable values only), or a proxy. + Regardless, the received object should be treated as though + the original has been shared directly, whether or not it + actually is. That's a slightly different and stronger promise + than just (initial) equality, which is all "syncobj=False" + can promise. + + "unbound" controls the behavior of Queue.get() for the given + object if the current interpreter (calling put()) is later + destroyed. + + If "unbound" is None (the default) then it uses the + queue's default, set with create_queue(), + which is usually UNBOUND. + + If "unbound" is UNBOUND_ERROR then get() will raise an + ItemInterpreterDestroyed exception if the original interpreter + has been destroyed. This does not otherwise affect the queue; + the next call to put() will work like normal, returning the next + item in the queue. + + If "unbound" is UNBOUND_REMOVE then the item will be removed + from the queue as soon as the original interpreter is destroyed. + Be aware that this will introduce an imbalance between put() + and get() calls. + + If "unbound" is UNBOUND then it is returned by get() in place + of the unbound item. + """ + if syncobj is None: + fmt = self._fmt + else: + fmt = _SHARED_ONLY if syncobj else _PICKLED + if unbound is None: + unboundop, = self._unbound + else: + unboundop, = _serialize_unbound(unbound) + if timeout is not None: + timeout = int(timeout) + if timeout < 0: + raise ValueError(f'timeout value must be non-negative') + end = time.time() + timeout + if fmt is _PICKLED: + obj = pickle.dumps(obj) + while True: + try: + _queues.put(self._id, obj, fmt, unboundop) + except QueueFull as exc: + if timeout is not None and time.time() >= end: + raise # re-raise + time.sleep(_delay) + else: + break + + def put_nowait(self, obj, *, syncobj=None, unbound=None): + if syncobj is None: + fmt = self._fmt + else: + fmt = _SHARED_ONLY if syncobj else _PICKLED + if unbound is None: + unboundop, = self._unbound + else: + unboundop, = _serialize_unbound(unbound) + if fmt is _PICKLED: + obj = pickle.dumps(obj) + _queues.put(self._id, obj, fmt, unboundop) + + def get(self, timeout=None, *, + _delay=10 / 1000, # 10 milliseconds + ): + """Return the next object from the queue. + + This blocks while the queue is empty. + + If the next item's original interpreter has been destroyed + then the "next object" is determined by the value of the + "unbound" argument to put(). + """ + if timeout is not None: + timeout = int(timeout) + if timeout < 0: + raise ValueError(f'timeout value must be non-negative') + end = time.time() + timeout + while True: + try: + obj, fmt, unboundop = _queues.get(self._id) + except QueueEmpty as exc: + if timeout is not None and time.time() >= end: + raise # re-raise + time.sleep(_delay) + else: + break + if unboundop is not None: + assert obj is None, repr(obj) + return _resolve_unbound(unboundop) + if fmt == _PICKLED: + obj = pickle.loads(obj) + else: + assert fmt == _SHARED_ONLY + return obj + + def get_nowait(self): + """Return the next object from the channel. + + If the queue is empty then raise QueueEmpty. Otherwise this + is the same as get(). + """ + try: + obj, fmt, unboundop = _queues.get(self._id) + except QueueEmpty as exc: + raise # re-raise + if unboundop is not None: + assert obj is None, repr(obj) + return _resolve_unbound(unboundop) + if fmt == _PICKLED: + obj = pickle.loads(obj) + else: + assert fmt == _SHARED_ONLY + return obj + + +_queues._register_heap_types(Queue, QueueEmpty, QueueFull)