Skip to content

Commit 115e4c7

Browse files
committed
Support for forthcoming timeouts and task cancellation. Barrier supports nowait.
1 parent fc7aca6 commit 115e4c7

File tree

3 files changed

+235
-2
lines changed

3 files changed

+235
-2
lines changed

TUTORIAL.md

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,17 @@ guides to this may be found online.
5252

5353
3.5 [Queue](./TUTORIAL.md#35-queue)
5454

55+
3.6 [Task cancellation](./TUTORIAL.md#36-task-cancellation)
56+
5557
4. [Designing classes for asyncio](./TUTORIAL.md#4-designing-classes-for-asyncio)
5658

5759
4.1 [Awaitable classes](./TUTORIAL.md#41-awaitable-classes)
5860

5961
4.2 [Asynchronous iterators](./TUTORIAL.md#42-asynchronous-iterators)
6062

6163
4.3 [Asynchronous context managers](./TUTORIAL.md#43-asynchronous-context-managers)
64+
65+
4.4 [Coroutines with timeouts](./TUTORIAL.md#44-coroutines-with-timeouts)
6266

6367
5. [Device driver examples](./TUTORIAL.md#5-device-driver-examples)
6468

@@ -440,6 +444,13 @@ multiple instances of ``report`` print their result and pause until the other
440444
instances are also complete. At that point the callback runs. On its completion
441445
the coros resume.
442446

447+
A special case of `Barrier` usage is where some coros are allowed to pass the
448+
barrier, registering the fact that they have done so. At least one coro must
449+
wait on the barrier. It will continue execution when all non-waiting coros have
450+
passed the barrier, and all other waiting coros have reached it. This can be of
451+
use when cancelling coros. A coro which cancels others might wait until all
452+
cancelled coros have passed the barrier as they quit.
453+
443454
###### [Jump to Contents](./TUTORIAL.md#contents)
444455

445456
## 3.4 Semaphore
@@ -498,6 +509,71 @@ An example of its use is provided in ``aqtest.py``.
498509

499510
###### [Jump to Contents](./TUTORIAL.md#contents)
500511

512+
## 3.6 Task cancellation
513+
514+
At the time of writing (12th Nov 2017) this requires PR #3380 and
515+
micropython-lib PR #221 which are yet to be merged.
516+
517+
The `uasyncio` library supports task cancellation by throwing an exception to
518+
the coro which is to be cancelled. The latter must trap the exception and
519+
(after performing any cleanup) terminate. The use of this mechanism is
520+
facilitated by the `Cancellable` class which enables a coro to be associated
521+
with a user-defined name for cancellation. Examples of its usage may be found
522+
in `asyntest.py`.
523+
524+
A cancellable coro is instantiated from a normal coro `foo()` by means of
525+
`Cancellable(foo(5), 'foo')`. Note the passing of an argument to the coro. A
526+
coro can `await` such a task with `await Cancellable(foo(5), 'foo')`.
527+
Alternatively a cancellable task can be scheduled for execution with
528+
`loop.create_task(Cancellable(foo(5), 'foo').task)`
529+
530+
In either case the coro with the user-defined name 'foo' is cancelled with
531+
`Cancellable.cancel('foo')`. The coro `foo` will receive the `CancelError` when
532+
it next runs. This means that in real time, and from the point of view of the
533+
coro which has cancelled it, cancellation may not be immediate. In some
534+
situations this may matter. Synchronisation may be achieved using the `Barrier`
535+
class, with the cancelling task pausing until all the coros it has cancelled
536+
have processed the exception. The following - adapted from `asyntest.py` -
537+
illustrates this.
538+
539+
```python
540+
import uasyncio as asyncio
541+
from asyn import Barrier, Cancellable, CancelError
542+
543+
async def forever(n):
544+
print('Started forever() instance', n)
545+
while True: # Run until cancelled. Error propagates to caller.
546+
await asyncio.sleep(7 + n)
547+
print('Running instance', n)
548+
549+
barrier = Barrier(3) # 3 tasks share the barrier
550+
551+
async def rats(n):
552+
# Cancellable coros must trap the CancelError
553+
try:
554+
await forever(n) # Error propagates up from forever()
555+
except CancelError:
556+
await barrier(nowait = True) # Quit immediately
557+
print('Instance', n, 'was cancelled')
558+
559+
async def run_cancel_test2():
560+
loop = asyncio.get_event_loop()
561+
loop.create_task(Cancellable(rats(1), 'rats_1').task)
562+
loop.create_task(Cancellable(rats(2), 'rats_2').task)
563+
print('Running two tasks')
564+
await asyncio.sleep(10)
565+
print('About to cancel tasks')
566+
Cancellable.cancel('rats_1')
567+
Cancellable.cancel('rats_2')
568+
await barrier # Continue when dependent tasks have quit
569+
print('tasks were cancelled')
570+
571+
loop = asyncio.get_event_loop()
572+
loop.run_until_complete(run_cancel_test2())
573+
```
574+
575+
###### [Jump to Contents](./TUTORIAL.md#contents)
576+
501577
# 4 Designing classes for asyncio
502578

503579
In the context of device drivers the aim is to ensure nonblocking operation.
@@ -628,6 +704,43 @@ to completion. The error appears to be in PEP492. See
628704

629705
###### [Jump to Contents](./TUTORIAL.md#contents)
630706

707+
## 4.4 Coroutines with timeouts
708+
709+
At the time of writing (12th Nov 2017) this requires PR #3380 and
710+
micropython-lib PR #221 which are yet to be merged.
711+
712+
Timeouts are implemented by means of `uasyncio.wait_for()`. This takes as
713+
arguments a coroutine and a timeout in seconds. If the timeout expires a
714+
`TimeoutError` will be thrown to the coro. The next time the coro is scheduled
715+
for execution the exception will be raised: the coro should trap this and quit.
716+
717+
```python
718+
import uasyncio as asyncio
719+
720+
async def forever():
721+
print('Starting')
722+
try:
723+
while True:
724+
await asyncio.sleep_ms(300)
725+
print('Got here')
726+
except asyncio.TimeoutError:
727+
print('Got timeout')
728+
729+
async def foo():
730+
await asyncio.wait_for(forever(), 5)
731+
await asyncio.sleep(2)
732+
733+
loop = asyncio.get_event_loop()
734+
loop.run_until_complete(foo())
735+
```
736+
737+
Note that if the coro awaits a long delay, it will not be rescheduled until the
738+
time has elapsed. The `TimeoutError` will occur as soon as it is scheduled. But
739+
in real time and from the point of view of the calling coro, its response to
740+
the `TimeoutError` will be correspondingly delayed.
741+
742+
###### [Jump to Contents](./TUTORIAL.md#contents)
743+
631744
# 5 Device driver examples
632745

633746
Many devices such as sensors are read-only in nature and need to be polled to

asyn.py

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,29 +109,49 @@ def value(self):
109109
# A Barrier synchronises N coros. Each issues await barrier
110110
# execution pauses until all other participant coros are waiting on it.
111111
# At that point the callback is executed. Then the barrier is 'opened' and
112-
# excution of all participants resumes.
112+
# execution of all participants resumes.
113+
# The nowait arg enables usage where one or more coros can register that
114+
# they have reached the barrier without waiting for it. Other coros waiting
115+
# normally on the barrier will pause until all non-waiting coros have passed
116+
# the barrier and all waiting ones have reached it.
117+
# The use of nowait promotes efficiency by enabling tasks to leave the task
118+
# queue as soon as possible.
119+
113120
# Uses low_priority if available
121+
114122
class Barrier():
115123
def __init__(self, participants, func=None, args=()):
116124
self._participants = participants
117125
self._func = func
118126
self._args = args
119127
self._reset(True)
128+
self._nowait = False
120129

121130
def __await__(self):
122131
self._update()
123132
if self._at_limit(): # All other threads are also at limit
133+
self._nowait = False
124134
if self._func is not None:
125135
launch(self._func, self._args)
126136
self._reset(not self._down)
127137
return
128138

139+
if self._nowait:
140+
self._nowait = False
141+
if self._func is not None:
142+
launch(self._func, self._args)
143+
return
144+
129145
direction = self._down
130146
while True: # Wait until last waiting thread changes the direction
131147
if direction != self._down:
132148
return
133149
yield from after(0)
134150

151+
def __call__(self, nowait=False): # Enable await barrier(nowait = True)
152+
self._nowait = nowait
153+
return self
154+
135155
__iter__ = __await__
136156

137157
def _reset(self, down):
@@ -179,6 +199,36 @@ def release(self):
179199
else:
180200
raise ValueError('Semaphore released more than acquired')
181201

202+
# Task cancellation. At the time of writing this is dependent on PR #3380 and
203+
# micropython-lib PR #221 which are not yet merged.
204+
# A coro foo is made cancellable by isssuing Cancellable(foo(5), 'foo'), the
205+
# second arg being a name used when the task is to be cancelled. This is done
206+
# by issuing Cancellable.cancel('foo'). See asyntest.py and tutorial.
207+
208+
class CancelError(Exception):
209+
pass
210+
211+
class Cancellable():
212+
tasks = {}
213+
@classmethod
214+
def cancel(cls, taskname):
215+
if taskname in cls.tasks:
216+
cls.tasks.pop(taskname).pend_throw(CancelError)
217+
return True
218+
return False
219+
220+
def __init__(self, task, name):
221+
self.tasks[name] = task
222+
self.task = task
223+
224+
def __await__(self):
225+
res = yield from self.task
226+
return res
227+
228+
__iter__ = __await__
229+
230+
# ExitGate is obsolescent - task cancellation is about to be implemented.
231+
# Retained for compatibilty with existing applications.
182232
# uasyncio does not have a mechanism whereby a task can be terminated by another.
183233
# This can cause an issue if a task instantiates other tasks then terminates:
184234
# the child tasks continue to run, which may not be desired. The ExitGate helps

asyntest.py

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
# event_test()
88
# barrier_test()
99
# semaphore_test() Pass True to test BoundedSemaphore class
10+
# cancel_test1() Awaiting cancellable coros
11+
# cancel_test2() Cancellable coros as tasks and using barrier to synchronise.
1012
# Issue ctrl-D after running each test
1113

1214
# CPython 3.5 compatibility
@@ -16,7 +18,7 @@
1618
except ImportError:
1719
import asyncio
1820

19-
from asyn import Lock, Event, Semaphore, BoundedSemaphore, Barrier
21+
from asyn import Lock, Event, Semaphore, BoundedSemaphore, Barrier, Cancellable, CancelError
2022

2123
# ************ Test Event class ************
2224
# Demo use of acknowledge event
@@ -150,3 +152,71 @@ async def run_sema_test(bounded):
150152
def semaphore_test(bounded=False):
151153
loop = asyncio.get_event_loop()
152154
loop.run_until_complete(run_sema_test(bounded))
155+
156+
# ************ Cancellation tests ************
157+
158+
async def foo(num):
159+
try:
160+
await asyncio.sleep(4)
161+
return num + 42
162+
except CancelError:
163+
print('foo was cancelled.')
164+
return -1
165+
166+
def kill(task_name):
167+
if Cancellable.cancel(task_name):
168+
print(task_name, 'will be cancelled when next scheduled')
169+
else:
170+
print(task_name, 'was not cancellable.')
171+
172+
# Example of a task which cancels another
173+
async def bar():
174+
await asyncio.sleep(1)
175+
kill('foo')
176+
kill('not_me') # Will fail because not yet scheduled
177+
178+
async def run_cancel_test1():
179+
loop = asyncio.get_event_loop()
180+
loop.create_task(bar())
181+
res = await Cancellable(foo(5), 'foo')
182+
print(res)
183+
res = await Cancellable(foo(0), 'not_me') # Runs to completion
184+
print(res)
185+
186+
def cancel_test1():
187+
loop = asyncio.get_event_loop()
188+
loop.run_until_complete(run_cancel_test1())
189+
190+
# TEST with barrier
191+
192+
async def forever(n):
193+
print('Started forever() instance', n)
194+
while True:
195+
await asyncio.sleep(7 + n)
196+
print('Running instance', n)
197+
198+
barrier = Barrier(3)
199+
200+
# Cancellable coros must trap the CancelError
201+
async def rats(n):
202+
try:
203+
await forever(n)
204+
except CancelError:
205+
await barrier(nowait = True)
206+
print('Instance', n, 'was cancelled')
207+
208+
async def run_cancel_test2():
209+
loop = asyncio.get_event_loop()
210+
loop.create_task(Cancellable(rats(1), 'rats_1').task)
211+
loop.create_task(Cancellable(rats(2), 'rats_2').task)
212+
print('Running two tasks')
213+
await asyncio.sleep(10)
214+
print('About to cancel tasks')
215+
Cancellable.cancel('rats_1')
216+
Cancellable.cancel('rats_2')
217+
await barrier
218+
print('tasks were cancelled')
219+
220+
def cancel_test2():
221+
loop = asyncio.get_event_loop()
222+
loop.run_until_complete(run_cancel_test2())

0 commit comments

Comments
 (0)