Skip to content

python-stdlib/threading: Add Lock, Condition and Event to threading. #503

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
328 changes: 327 additions & 1 deletion python-stdlib/threading/threading.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,340 @@
import _thread
from time import ticks_ms, ticks_diff

try:
from ucollections import deque as _deque

_deque.clear
except (ImportError, AttributeError):
from collections import deque as _deque


class Thread:
def __init__(self, group=None, target=None, name=None, args=(), kwargs=None):
self.target = target
self.args = args
self.daemon = None
self.kwargs = {} if kwargs is None else kwargs
self.ident = None

self._started = False
self._lock = Lock()
self._ret = None
self._ex = None

def start(self):
self._lock.acquire()
_thread.start_new_thread(self.run, ())
self._started = True

def run(self):
self.target(*self.args, **self.kwargs)
self.ident = _thread.get_ident()
try:
self._ret = self.target(*self.args, **self.kwargs)
except Exception as ex:
self._ex = ex
self._lock.release()

def join(self, timeout=None):
if not self._started:
raise RuntimeError("cannot join thread before it is started")
self._lock.acquire(True, timeout)
if self._ex:
raise self._ex
return self._ret


Lock = _thread.allocate_lock


class Condition:
"""Class that implements a condition variable."""

# A condition variable allows one or more threads to wait until they are
# notified by another thread.
# If the lock argument is given and not None, it must be a Lock or RLock
# object, and it is used as the underlying lock. Otherwise, a new RLock object
# is created and used as the underlying lock.

def __init__(self, lock=None):
if lock is None:
lock = Lock()
self._lock = lock
# Export the lock's acquire() and release() methods
self.acquire = lock.acquire
self.release = lock.release
# If the lock defines _release_save() and/or _acquire_restore(),
# these override the default implementations (which just call
# release() and acquire() on the lock). Ditto for _is_owned().
try:
self._release_save = lock._release_save
except AttributeError:
pass
try:
self._acquire_restore = lock._acquire_restore
except AttributeError:
pass
try:
self._is_owned = lock._is_owned
except AttributeError:
pass
self._waiters = _deque()

def _at_fork_reinit(self):
self._lock._at_fork_reinit()
self._waiters.clear()

def __enter__(self):
return self._lock.__enter__()

def __exit__(self, *args):
return self._lock.__exit__(*args)

def __repr__(self):
return "<Condition(%s, %d)>" % (self._lock, len(self._waiters))

def _release_save(self):
self._lock.release() # No state to save

def _acquire_restore(self, x):
self._lock.acquire() # Ignore saved state

def _is_owned(self):
"""Return True if lock is owned by current_thread."""
# This method is called only if _lock doesn't have _is_owned().
if self._lock.acquire(False):
self._lock.release()
return False
else:
return True

def wait(self, timeout=None):
"""Wait until notified or until a timeout occurs."""
# If the calling thread has not acquired the lock when this method is
# called, a RuntimeError is raised.
# This method releases the underlying lock, and then blocks until it is
# awakened by a notify() or notify_all() call for the same condition
# variable in another thread, or until the optional timeout occurs. Once
# awakened or timed out, it re-acquires the lock and returns.
# When the timeout argument is present and not None, it should be a
# floating point number specifying a timeout for the operation in seconds
# (or fractions thereof).
# When the underlying lock is an RLock, it is not released using its
# release() method, since this may not actually unlock the lock when it
# was acquired multiple times recursively. Instead, an internal interface
# of the RLock class is used, which really unlocks it even when it has
# been recursively acquired several times. Another internal interface is
# then used to restore the recursion level when the lock is reacquired.
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
waiter = _thread.allocate_lock()
waiter.acquire()
self._waiters.append(waiter)
saved_state = self._release_save()
gotit = False
try: # restore state no matter what (e.g., KeyboardInterrupt)
if timeout is None:
waiter.acquire()
gotit = True
else:
if timeout > 0:
gotit = waiter.acquire(True, timeout)
else:
gotit = waiter.acquire(False)
return gotit
finally:
self._acquire_restore(saved_state)
if not gotit:
try:
self._waiters.remove(waiter)
except ValueError:
pass

def wait_for(self, predicate, timeout=None):
"""Wait until a condition evaluates to True."""
# predicate should be a callable which result will be interpreted as a
# boolean value. A timeout may be provided giving the maximum time to
# wait.
endtime = None
waittime = timeout
result = predicate()
while not result:
if waittime is not None:
if endtime is None:
endtime = ticks_ms() + int(waittime * 1000)
else:
waittime = ticks_diff(endtime, ticks_ms())
if waittime <= 0:
break
self.wait(waittime)
result = predicate()
return result

def notify(self, n=1):
"""Wake up one or more threads waiting on this condition, if any."""
# If the calling thread has not acquired the lock when this method is
# called, a RuntimeError is raised.
# This method wakes up at most n of the threads waiting for the condition
# variable; it is a no-op if no threads are waiting.

if not self._is_owned():
raise RuntimeError("cannot notify on un-acquired lock")
waiters = self._waiters
while waiters and n > 0:
waiter = waiters[0]
try:
waiter.release()
except RuntimeError:
# gh-92530: The previous call of notify() released the lock,
# but was interrupted before removing it from the queue.
# It can happen if a signal handler raises an exception,
# like CTRL+C which raises KeyboardInterrupt.
pass
else:
n -= 1
try:
waiters.remove(waiter)
except ValueError:
pass

def notify_all(self):
"""Wake up all threads waiting on this condition."""
# If the calling thread has not acquired the lock when this method
# is called, a RuntimeError is raised.
self.notify(len(self._waiters))


class Event:
"""Class implementing event objects."""

# Events manage a flag that can be set to true with the set() method and reset
# to false with the clear() method. The wait() method blocks until the flag is
# true. The flag is initially false.

def __init__(self):
self._cond = Condition(Lock())
self._flag = False

def _at_fork_reinit(self):
# Private method called by Thread._reset_internal_locks()
self._cond._at_fork_reinit()

def is_set(self):
"""Return true if and only if the internal flag is true."""
return self._flag

def set(self):
"""Set the internal flag to true."""

# All threads waiting for it to become true are awakened. Threads
# that call wait() once the flag is true will not block at all.

with self._cond:
self._flag = True
self._cond.notify_all()

def clear(self):
"""Reset the internal flag to false."""

# Subsequently, threads calling wait() will block until set() is called to
# set the internal flag to true again.

with self._cond:
self._flag = False

def wait(self, timeout=None):
"""Block until the internal flag is true."""

# If the internal flag is true on entry, return immediately. Otherwise,
# block until another thread calls set() to set the flag to true, or until
# the optional timeout occurs.

# When the timeout argument is present and not None, it should be a
# floating point number specifying a timeout for the operation in seconds
# (or fractions thereof).

# This method returns the internal flag on exit, so it will always return
# True except if a timeout is given and the operation times out.

with self._cond:
signaled = self._flag
if not signaled:
signaled = self._cond.wait(timeout)
return signaled


class Semaphore:
"""This class implements semaphore objects."""

# Semaphores manage a counter representing the number of release() calls minus
# the number of acquire() calls, plus an initial value. The acquire() method
# blocks if necessary until it can return without making the counter
# negative. If not given, value defaults to 1.

def __init__(self, value=1):
if value < 0:
raise ValueError("semaphore initial value must be >= 0")
self._cond = Condition(Lock())
self._value = value

def acquire(self, blocking=True, timeout=None):
"""Acquire a semaphore, decrementing the internal counter by one."""

# When invoked without arguments: if the internal counter is larger than
# zero on entry, decrement it by one and return immediately. If it is zero
# on entry, block, waiting until some other thread has called release() to
# make it larger than zero. This is done with proper interlocking so that
# if multiple acquire() calls are blocked, release() will wake exactly one
# of them up. The implementation may pick one at random, so the order in
# which blocked threads are awakened should not be relied on. There is no
# return value in this case.

# When invoked with blocking set to true, do the same thing as when called
# without arguments, and return true.

# When invoked with blocking set to false, do not block. If a call without
# an argument would block, return false immediately; otherwise, do the
# same thing as when called without arguments, and return true.

# When invoked with a timeout other than None, it will block for at
# most timeout seconds. If acquire does not complete successfully in
# that interval, return false. Return true otherwise.

if not blocking and timeout is not None:
raise ValueError("can't specify timeout for non-blocking acquire")
rc = False
endtime = None
with self._cond:
while self._value == 0:
if not blocking:
break
if timeout is not None:
if endtime is None:
endtime = ticks_ms() + int(timeout * 1000)
else:
timeout = ticks_diff(endtime, ticks_ms())
if timeout <= 0:
break
self._cond.wait(timeout)
else:
self._value -= 1
rc = True
return rc

__enter__ = acquire

def release(self, n=1):
"""Release a semaphore, incrementing the internal counter by one or more."""

# When the counter is zero on entry and another thread is waiting for it
# to become larger than zero again, wake up that thread.

if n < 1:
raise ValueError("n must be one or more")
with self._cond:
self._value += n
for i in range(n):
self._cond.notify()

def __exit__(self, t, v, tb):
self.release()