@@ -95,6 +95,8 @@ async def acquire(self):
95
95
This method blocks until the lock is unlocked, then sets it to
96
96
locked and returns True.
97
97
"""
98
+ # Implement fair scheduling, where thread always waits
99
+ # its turn. Jumping the queue if all are cancelled is an optimization.
98
100
if (not self ._locked and (self ._waiters is None or
99
101
all (w .cancelled () for w in self ._waiters ))):
100
102
self ._locked = True
@@ -105,19 +107,22 @@ async def acquire(self):
105
107
fut = self ._get_loop ().create_future ()
106
108
self ._waiters .append (fut )
107
109
108
- # Finally block should be called before the CancelledError
109
- # handling as we don't want CancelledError to call
110
- # _wake_up_first() and attempt to wake up itself.
111
110
try :
112
111
try :
113
112
await fut
114
113
finally :
115
114
self ._waiters .remove (fut )
116
115
except exceptions .CancelledError :
116
+ # Currently the only exception designed be able to occur here.
117
+
118
+ # Ensure the lock invariant: If lock is not claimed (or about
119
+ # to be claimed by us) and there is a Task in waiters,
120
+ # ensure that the Task at the head will run.
117
121
if not self ._locked :
118
122
self ._wake_up_first ()
119
123
raise
120
124
125
+ # assert self._locked is False
121
126
self ._locked = True
122
127
return True
123
128
@@ -139,17 +144,15 @@ def release(self):
139
144
raise RuntimeError ('Lock is not acquired.' )
140
145
141
146
def _wake_up_first (self ):
142
- """Wake up the first waiter if it isn't done ."""
147
+ """Ensure that the first waiter will wake up ."""
143
148
if not self ._waiters :
144
149
return
145
150
try :
146
151
fut = next (iter (self ._waiters ))
147
152
except StopIteration :
148
153
return
149
154
150
- # .done() necessarily means that a waiter will wake up later on and
151
- # either take the lock, or, if it was cancelled and lock wasn't
152
- # taken already, will hit this again and wake up a new waiter.
155
+ # .done() means that the waiter is already set to wake up.
153
156
if not fut .done ():
154
157
fut .set_result (True )
155
158
@@ -269,17 +272,22 @@ async def wait(self):
269
272
self ._waiters .remove (fut )
270
273
271
274
finally :
272
- # Must reacquire lock even if wait is cancelled
273
- cancelled = False
275
+ # Must re-acquire lock even if wait is cancelled.
276
+ # We only catch CancelledError here, since we don't want any
277
+ # other (fatal) errors with the future to cause us to spin.
278
+ err = None
274
279
while True :
275
280
try :
276
281
await self .acquire ()
277
282
break
278
- except exceptions .CancelledError :
279
- cancelled = True
283
+ except exceptions .CancelledError as e :
284
+ err = e
280
285
281
- if cancelled :
282
- raise exceptions .CancelledError
286
+ if err :
287
+ try :
288
+ raise err # Re-raise most recent exception instance.
289
+ finally :
290
+ err = None # Break reference cycles.
283
291
284
292
async def wait_for (self , predicate ):
285
293
"""Wait until a predicate becomes true.
@@ -357,6 +365,7 @@ def __repr__(self):
357
365
358
366
def locked (self ):
359
367
"""Returns True if semaphore cannot be acquired immediately."""
368
+ # Due to state, or FIFO rules (must allow others to run first).
360
369
return self ._value == 0 or (
361
370
any (not w .cancelled () for w in (self ._waiters or ())))
362
371
@@ -370,6 +379,7 @@ async def acquire(self):
370
379
True.
371
380
"""
372
381
if not self .locked ():
382
+ # Maintain FIFO, wait for others to start even if _value > 0.
373
383
self ._value -= 1
374
384
return True
375
385
@@ -378,22 +388,27 @@ async def acquire(self):
378
388
fut = self ._get_loop ().create_future ()
379
389
self ._waiters .append (fut )
380
390
381
- # Finally block should be called before the CancelledError
382
- # handling as we don't want CancelledError to call
383
- # _wake_up_first() and attempt to wake up itself.
384
391
try :
385
392
try :
386
393
await fut
387
394
finally :
388
395
self ._waiters .remove (fut )
389
396
except exceptions .CancelledError :
390
- if not fut .cancelled ():
397
+ # Currently the only exception designed be able to occur here.
398
+ if fut .done () and not fut .cancelled ():
399
+ # Our Future was successfully set to True via _wake_up_next(),
400
+ # but we are not about to successfully acquire(). Therefore we
401
+ # must undo the bookkeeping already done and attempt to wake
402
+ # up someone else.
391
403
self ._value += 1
392
- self ._wake_up_next ()
393
404
raise
394
405
395
- if self ._value > 0 :
396
- self ._wake_up_next ()
406
+ finally :
407
+ # New waiters may have arrived but had to wait due to FIFO.
408
+ # Wake up as many as are allowed.
409
+ while self ._value > 0 :
410
+ if not self ._wake_up_next ():
411
+ break # There was no-one to wake up.
397
412
return True
398
413
399
414
def release (self ):
@@ -408,13 +423,15 @@ def release(self):
408
423
def _wake_up_next (self ):
409
424
"""Wake up the first waiter that isn't done."""
410
425
if not self ._waiters :
411
- return
426
+ return False
412
427
413
428
for fut in self ._waiters :
414
429
if not fut .done ():
415
430
self ._value -= 1
416
431
fut .set_result (True )
417
- return
432
+ # `fut` is now `done()` and not `cancelled()`.
433
+ return True
434
+ return False
418
435
419
436
420
437
class BoundedSemaphore (Semaphore ):
0 commit comments