Skip to content

extmod/asyncio: Add Task methods from CPython #13000

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
4 changes: 4 additions & 0 deletions extmod/asyncio/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ class TimeoutError(Exception):
pass


class InvalidStateError(Exception):
pass


# Used when calling Loop.call_exception_handler
_exc_context = {"message": "Task exception wasn't retrieved", "exception": None, "future": None}

Expand Down
73 changes: 73 additions & 0 deletions extmod/asyncio/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,76 @@ def cancel(self):
core._task_queue.push(self)
self.data = core.CancelledError
return True

def add_done_callback(self, callback):
if not self.state:
callback(self, self.data)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This signature of the callback does not match CPython. In CPython the callback takes only one argument.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right - I'm hoping to follow up to bring this in line with CPython in cases where only one argument is accepted in the function in a follow up PR.

However, changing the callback signature here would mean it's different from where it gets called elsewhere.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could add a comment to clarify that if it helps?


if self.state is not True:
raise RuntimeError("Tasks only support one done callback.")

self.state = callback

def remove_done_callback(self, callback):
if self.state is not callback:
return 0

self.state = True
return 1

def result(self):
"""
Return the result of the Task.
If the Task is done, the result of the wrapped coroutine is returned (or if the coroutine raised an exception, that exception is re-raised.)
If the Task has been cancelled, this method raises a CancelledError exception.
If the Task’s result isn’t yet available, this method raises a InvalidStateError exception.
"""
if not self.done():
raise core.InvalidStateError()

exception = self.exception()

if exception is not None:
raise exception

if not isinstance(self.data, StopIteration):
# If this isn't the case then we're in an odd state.
return None

return self.data.value

def exception(self):
"""
Return the exception that was set on this Task.
The exception (or None if no exception was set) is returned only if the Task is done.
If the Task has been cancelled, this method raises a CancelledError exception.
If the Task isn’t done yet, this method raises an InvalidStateError exception.
"""
if not self.done():
raise core.InvalidStateError()

if isinstance(self.data, core.CancelledError):
raise self.data

if isinstance(self.data, StopIteration):
# If the data is a stop iteration we can assume this
# was a successful run rather than any possible exception
return None

if not isinstance(self.data, BaseException):
# If the data is not any type of exception we can treat it as
# something else we don't understand but not an exception.
return None

return self.data

def cancelled(self) -> bool:
"""
Return True if the Task is cancelled.
The Task is cancelled when the cancellation was requested with cancel() and
the wrapped coroutine propagated the CancelledError exception thrown into it.
"""
if not self.done():
return False

return isinstance(self.data, core.CancelledError)
132 changes: 132 additions & 0 deletions extmod/modasyncio.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@
(task)->state == TASK_STATE_DONE_NOT_WAITED_ON \
|| (task)->state == TASK_STATE_DONE_WAS_WAITED_ON)

#define IS_CANCELLED_ERROR(error) ( \
mp_obj_is_subclass_fast( \
MP_OBJ_FROM_PTR(mp_obj_get_type(error)), \
mp_obj_dict_get(asyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR_CancelledError)) \
))

typedef struct _mp_obj_task_t {
mp_pairheap_t pairheap;
mp_obj_t coro;
Expand Down Expand Up @@ -179,6 +185,114 @@ STATIC mp_obj_t task_done(mp_obj_t self_in) {
}
STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_done_obj, task_done);

STATIC mp_obj_t task_add_done_callback(mp_obj_t self_in, mp_obj_t callback) {
assert(mp_obj_is_callable(callback));
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);

if (TASK_IS_DONE(self)) {
// In CPython the callbacks are not immediately called and are instead
// called by the event loop. However, MicroPython's event loop doesn't
// support `call_soon` to handle callback processing.
//
// Because of this, it's close enough to call the callback immediately.

mp_call_function_2(callback, self_in, self->data);
return mp_const_none;
}

if (self->state != mp_const_true) {
// Tasks SHOULD support more than one callback per CPython but to reduce
// the surface area of this change tasks can currently only support one.
mp_raise_msg(&mp_type_RuntimeError, MP_ERROR_TEXT(">1 callback unsupported"));
}

self->state = callback;
return mp_const_none;
}
STATIC MP_DEFINE_CONST_FUN_OBJ_2(task_add_done_callback_obj, task_add_done_callback);

STATIC mp_obj_t task_remove_done_callback(mp_obj_t self_in, mp_obj_t callback) {
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);

if (callback != self->state) {
// If the callback isn't a match we can count this as removing 0 callbacks
return mp_obj_new_int(0);
}

self->state = mp_const_true;
return mp_obj_new_int(1);
}
STATIC MP_DEFINE_CONST_FUN_OBJ_2(task_remove_done_callback_obj, task_remove_done_callback);

STATIC mp_obj_t task_get_coro(mp_obj_t self_in) {
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
return MP_OBJ_FROM_PTR(self->coro);
}
STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_get_coro_obj, task_get_coro);

STATIC mp_obj_t task_exception(mp_obj_t self_in) {
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);

if (!TASK_IS_DONE(self)) {
mp_obj_t error_type = mp_obj_dict_get(asyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR_InvalidStateError));
nlr_raise(mp_make_raise_obj(error_type));
}

// If the exception is a cancelled error then we should raise it
if (IS_CANCELLED_ERROR(self->data)) {
nlr_raise(mp_make_raise_obj(self->data));
}

// If it's a StopIteration we should should return none
if (mp_obj_is_subclass_fast(MP_OBJ_FROM_PTR(mp_obj_get_type(self->data)), MP_OBJ_FROM_PTR(&mp_type_StopIteration))) {
return mp_const_none;
}

if (!mp_obj_is_exception_instance(self->data)) {
return mp_const_none;
}

return self->data;
}
STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_exception_obj, task_exception);

STATIC mp_obj_t task_result(mp_obj_t self_in) {
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);

if (!TASK_IS_DONE(self)) {
mp_obj_t error_type = mp_obj_dict_get(asyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR_InvalidStateError));
nlr_raise(mp_make_raise_obj(error_type));
}

// If `exception()` returns anything we raise that
mp_obj_t exception_obj = task_exception(self_in);
if (exception_obj != mp_const_none) {
nlr_raise(mp_make_raise_obj(exception_obj));
}

// If not StopIteration, bail early
if (!mp_obj_is_subclass_fast(MP_OBJ_FROM_PTR(mp_obj_get_type(self->data)), MP_OBJ_FROM_PTR(&mp_type_StopIteration))) {
return mp_const_none;
}

return mp_obj_exception_get_value(self->data);
}
STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_result_obj, task_result);

STATIC mp_obj_t task_cancelled(mp_obj_t self_in) {
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);

if (!TASK_IS_DONE(self)) {
// If task isn't done it can't possibly be cancelled, and would instead
// be considered "cancelling" even if a cancel was requested until it
// has fully completed.
return mp_obj_new_bool(false);
}

return mp_obj_new_bool(IS_CANCELLED_ERROR(self->data));
}
STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_cancelled_obj, task_cancelled);

STATIC mp_obj_t task_cancel(mp_obj_t self_in) {
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
// Check if task is already finished.
Expand Down Expand Up @@ -242,6 +356,24 @@ STATIC void task_attr(mp_obj_t self_in, qstr attr, mp_obj_t *dest) {
dest[1] = self_in;
} else if (attr == MP_QSTR_ph_key) {
dest[0] = self->ph_key;
} else if (attr == MP_QSTR_add_done_callback) {
dest[0] = MP_OBJ_FROM_PTR(&task_add_done_callback_obj);
dest[1] = self_in;
} else if (attr == MP_QSTR_remove_done_callback) {
dest[0] = MP_OBJ_FROM_PTR(&task_remove_done_callback_obj);
dest[1] = self_in;
} else if (attr == MP_QSTR_get_coro) {
dest[0] = MP_OBJ_FROM_PTR(&task_get_coro_obj);
dest[1] = self_in;
} else if (attr == MP_QSTR_result) {
dest[0] = MP_OBJ_FROM_PTR(&task_result_obj);
dest[1] = self_in;
} else if (attr == MP_QSTR_exception) {
dest[0] = MP_OBJ_FROM_PTR(&task_exception_obj);
dest[1] = self_in;
} else if (attr == MP_QSTR_cancelled) {
dest[0] = MP_OBJ_FROM_PTR(&task_cancelled_obj);
dest[1] = self_in;
}
} else if (dest[1] != MP_OBJ_NULL) {
// Store
Expand Down
54 changes: 54 additions & 0 deletions tests/extmod/asyncio_task_add_done_callback.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Test the Task.add_done_callback() method

try:
import asyncio
except ImportError:
print("SKIP")
raise SystemExit


async def task(t, exc=None):
if t >= 0:
await asyncio.sleep(t)
if exc:
raise exc


def done_callback(t, er):
print("done", repr(t), repr(er))


async def main():
# Tasks that aren't done only execute done callback after finishing
print("=" * 10)
t = asyncio.create_task(task(-1))
t.add_done_callback(done_callback)
print("Waiting for task to complete")
await asyncio.sleep(0)
print("Task has completed")

# Task that are done run the callback immediately
print("=" * 10)
t = asyncio.create_task(task(-1))
await asyncio.sleep(0)
print("Task has completed")
t.add_done_callback(done_callback)
print("Callback Added")

# Task that starts, runs and finishes without an exception should return None
print("=" * 10)
t = asyncio.create_task(task(0.01))
t.add_done_callback(done_callback)
try:
t.add_done_callback(done_callback)
except RuntimeError as e:
print("Second call to add_done_callback emits error:", repr(e))

# Task that raises immediately should still run done callback
print("=" * 10)
t = asyncio.create_task(task(-1, ValueError))
t.add_done_callback(done_callback)
await asyncio.sleep(0)


asyncio.run(main())
12 changes: 12 additions & 0 deletions tests/extmod/asyncio_task_add_done_callback.py.exp
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
==========
Waiting for task to complete
done <Task> StopIteration()
Task has completed
==========
Task has completed
done <Task> StopIteration()
Callback Added
==========
Second call to add_done_callback emits error: RuntimeError('>1 callback unsupported',)
==========
done <Task> ValueError()
54 changes: 54 additions & 0 deletions tests/extmod/asyncio_task_cancelled.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Test the `Task.cancelled` method

try:
import asyncio
except ImportError:
print("SKIP")
raise SystemExit


async def task(t):
await asyncio.sleep(t)


async def main():
# Cancel task immediately doesn't mark the task as cancelled
print("=" * 10)
t = asyncio.create_task(task(2))
t.cancel()
print("Expecting task to not be cancelled because it is not done:", t.cancelled())

# Cancel task immediately and wait for cancellation to complete
print("=" * 10)
t = asyncio.create_task(task(2))
t.cancel()
await asyncio.sleep(0)
print("Expecting Task to be Cancelled:", t.cancelled())

# Cancel task and wait for cancellation to complete
print("=" * 10)
t = asyncio.create_task(task(2))
await asyncio.sleep(0.01)
t.cancel()
await asyncio.sleep(0)
print("Expecting Task to be Cancelled:", t.cancelled())

# Cancel task multiple times after it has started
print("=" * 10)
t = asyncio.create_task(task(2))
await asyncio.sleep(0.01)
for _ in range(4):
t.cancel()
await asyncio.sleep(0.01)

print("Expecting Task to be Cancelled:", t.cancelled())

# Cancel task after it has finished
print("=" * 10)
t = asyncio.create_task(task(0.01))
await asyncio.sleep(0.05)
t.cancel()
print("Expecting task to not be Cancelled:", t.cancelled())


asyncio.run(main())
10 changes: 10 additions & 0 deletions tests/extmod/asyncio_task_cancelled.py.exp
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
==========
Expecting task to not be cancelled because it is not done: False
==========
Expecting Task to be Cancelled: True
==========
Expecting Task to be Cancelled: True
==========
Expecting Task to be Cancelled: True
==========
Expecting task to not be Cancelled: False
Loading