Skip to content

Commit 72ad0e7

Browse files
committed
python-stdlib/threading: Add Lock, Condition and Event to threading.
1 parent 70e422d commit 72ad0e7

File tree

1 file changed

+236
-0
lines changed

1 file changed

+236
-0
lines changed

python-stdlib/threading/threading.py

+236
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,250 @@
11
import _thread
2+
from time import ticks_ms, ticks_diff
3+
try:
4+
from ucollections import deque as _deque
5+
_deque.clear
6+
except (ImportError, AttributeError):
7+
from collections import deque as _deque
28

39

410
class Thread:
511
def __init__(self, group=None, target=None, name=None, args=(), kwargs=None):
612
self.target = target
713
self.args = args
14+
self.daemon = None
815
self.kwargs = {} if kwargs is None else kwargs
916

1017
def start(self):
1118
_thread.start_new_thread(self.run, ())
1219

1320
def run(self):
1421
self.target(*self.args, **self.kwargs)
22+
23+
24+
Lock = _thread.allocate_lock
25+
26+
27+
class Condition:
28+
"""Class that implements a condition variable.
29+
A condition variable allows one or more threads to wait until they are
30+
notified by another thread.
31+
If the lock argument is given and not None, it must be a Lock or RLock
32+
object, and it is used as the underlying lock. Otherwise, a new RLock object
33+
is created and used as the underlying lock.
34+
"""
35+
36+
def __init__(self, lock=None):
37+
if lock is None:
38+
lock = Lock()
39+
self._lock = lock
40+
# Export the lock's acquire() and release() methods
41+
self.acquire = lock.acquire
42+
self.release = lock.release
43+
# If the lock defines _release_save() and/or _acquire_restore(),
44+
# these override the default implementations (which just call
45+
# release() and acquire() on the lock). Ditto for _is_owned().
46+
try:
47+
self._release_save = lock._release_save
48+
except AttributeError:
49+
pass
50+
try:
51+
self._acquire_restore = lock._acquire_restore
52+
except AttributeError:
53+
pass
54+
try:
55+
self._is_owned = lock._is_owned
56+
except AttributeError:
57+
pass
58+
self._waiters = _deque()
59+
60+
def _at_fork_reinit(self):
61+
self._lock._at_fork_reinit()
62+
self._waiters.clear()
63+
64+
def __enter__(self):
65+
return self._lock.__enter__()
66+
67+
def __exit__(self, *args):
68+
return self._lock.__exit__(*args)
69+
70+
def __repr__(self):
71+
return "<Condition(%s, %d)>" % (self._lock, len(self._waiters))
72+
73+
def _release_save(self):
74+
self._lock.release() # No state to save
75+
76+
def _acquire_restore(self, x):
77+
self._lock.acquire() # Ignore saved state
78+
79+
def _is_owned(self):
80+
# Return True if lock is owned by current_thread.
81+
# This method is called only if _lock doesn't have _is_owned().
82+
if self._lock.acquire(False):
83+
self._lock.release()
84+
return False
85+
else:
86+
return True
87+
88+
def wait(self, timeout=None):
89+
"""Wait until notified or until a timeout occurs.
90+
If the calling thread has not acquired the lock when this method is
91+
called, a RuntimeError is raised.
92+
This method releases the underlying lock, and then blocks until it is
93+
awakened by a notify() or notify_all() call for the same condition
94+
variable in another thread, or until the optional timeout occurs. Once
95+
awakened or timed out, it re-acquires the lock and returns.
96+
When the timeout argument is present and not None, it should be a
97+
floating point number specifying a timeout for the operation in seconds
98+
(or fractions thereof).
99+
When the underlying lock is an RLock, it is not released using its
100+
release() method, since this may not actually unlock the lock when it
101+
was acquired multiple times recursively. Instead, an internal interface
102+
of the RLock class is used, which really unlocks it even when it has
103+
been recursively acquired several times. Another internal interface is
104+
then used to restore the recursion level when the lock is reacquired.
105+
"""
106+
if not self._is_owned():
107+
raise RuntimeError("cannot wait on un-acquired lock")
108+
waiter = _thread.allocate_lock()
109+
waiter.acquire()
110+
self._waiters.append(waiter)
111+
saved_state = self._release_save()
112+
gotit = False
113+
try: # restore state no matter what (e.g., KeyboardInterrupt)
114+
if timeout is None:
115+
waiter.acquire()
116+
gotit = True
117+
else:
118+
if timeout > 0:
119+
gotit = waiter.acquire(True, timeout)
120+
else:
121+
gotit = waiter.acquire(False)
122+
return gotit
123+
finally:
124+
self._acquire_restore(saved_state)
125+
if not gotit:
126+
try:
127+
self._waiters.remove(waiter)
128+
except ValueError:
129+
pass
130+
131+
def wait_for(self, predicate, timeout=None):
132+
"""Wait until a condition evaluates to True.
133+
predicate should be a callable which result will be interpreted as a
134+
boolean value. A timeout may be provided giving the maximum time to
135+
wait.
136+
"""
137+
endtime = None
138+
waittime = timeout
139+
result = predicate()
140+
while not result:
141+
if waittime is not None:
142+
if endtime is None:
143+
endtime = ticks_ms() + waittime
144+
else:
145+
waittime = ticks_diff(endtime, ticks_ms())
146+
if waittime <= 0:
147+
break
148+
self.wait(waittime)
149+
result = predicate()
150+
return result
151+
152+
def notify(self, n=1):
153+
"""Wake up one or more threads waiting on this condition, if any.
154+
If the calling thread has not acquired the lock when this method is
155+
called, a RuntimeError is raised.
156+
This method wakes up at most n of the threads waiting for the condition
157+
variable; it is a no-op if no threads are waiting.
158+
"""
159+
if not self._is_owned():
160+
raise RuntimeError("cannot notify on un-acquired lock")
161+
waiters = self._waiters
162+
while waiters and n > 0:
163+
waiter = waiters[0]
164+
try:
165+
waiter.release()
166+
except RuntimeError:
167+
# gh-92530: The previous call of notify() released the lock,
168+
# but was interrupted before removing it from the queue.
169+
# It can happen if a signal handler raises an exception,
170+
# like CTRL+C which raises KeyboardInterrupt.
171+
pass
172+
else:
173+
n -= 1
174+
try:
175+
waiters.remove(waiter)
176+
except ValueError:
177+
pass
178+
179+
def notify_all(self):
180+
"""Wake up all threads waiting on this condition.
181+
If the calling thread has not acquired the lock when this method
182+
is called, a RuntimeError is raised.
183+
"""
184+
self.notify(len(self._waiters))
185+
186+
187+
188+
class Event:
189+
"""Class implementing event objects.
190+
191+
Events manage a flag that can be set to true with the set() method and reset
192+
to false with the clear() method. The wait() method blocks until the flag is
193+
true. The flag is initially false.
194+
195+
"""
196+
197+
def __init__(self):
198+
self._cond = Condition(Lock())
199+
self._flag = False
200+
201+
def _at_fork_reinit(self):
202+
# Private method called by Thread._reset_internal_locks()
203+
self._cond._at_fork_reinit()
204+
205+
def is_set(self):
206+
"""Return true if and only if the internal flag is true."""
207+
return self._flag
208+
209+
210+
def set(self):
211+
"""Set the internal flag to true.
212+
213+
All threads waiting for it to become true are awakened. Threads
214+
that call wait() once the flag is true will not block at all.
215+
216+
"""
217+
with self._cond:
218+
self._flag = True
219+
self._cond.notify_all()
220+
221+
def clear(self):
222+
"""Reset the internal flag to false.
223+
224+
Subsequently, threads calling wait() will block until set() is called to
225+
set the internal flag to true again.
226+
227+
"""
228+
with self._cond:
229+
self._flag = False
230+
231+
def wait(self, timeout=None):
232+
"""Block until the internal flag is true.
233+
234+
If the internal flag is true on entry, return immediately. Otherwise,
235+
block until another thread calls set() to set the flag to true, or until
236+
the optional timeout occurs.
237+
238+
When the timeout argument is present and not None, it should be a
239+
floating point number specifying a timeout for the operation in seconds
240+
(or fractions thereof).
241+
242+
This method returns the internal flag on exit, so it will always return
243+
True except if a timeout is given and the operation times out.
244+
245+
"""
246+
with self._cond:
247+
signaled = self._flag
248+
if not signaled:
249+
signaled = self._cond.wait(timeout)
250+
return signaled

0 commit comments

Comments
 (0)