Skip to content

update asyncio from MicroPython v1.19.1 #47

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions asyncio/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def __next__(self):
self.exc.__traceback__ = None
raise self.exc


# Pause task execution for the given time (integer in milliseconds, uPy extension)
# Use a SingletonGenerator to do it without allocating on the heap
def sleep_ms(t, sgen=SingletonGenerator()):
Expand Down Expand Up @@ -265,6 +266,11 @@ def run_until_complete(main_task=None):
if t.state is True:
# "None" indicates that the task is complete and not await'ed on (yet).
t.state = None
elif callable(t.state):
# The task has a callback registered to be called on completion.
t.state(t, er)
t.state = False
waiting = True
else:
# Schedule any other tasks waiting on the completion of this task.
while t.state.peek():
Expand Down
5 changes: 1 addition & 4 deletions asyncio/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ class Event:

def __init__(self):
self.state = False # False=unset; True=set
self.waiting = (
core.TaskQueue()
) # Queue of Tasks waiting on completion of this event
self.waiting = core.TaskQueue() # Queue of Tasks waiting on completion of this event

def is_set(self):
"""Returns ``True`` if the event is set, ``False`` otherwise."""
Expand Down Expand Up @@ -90,6 +88,5 @@ async def wait(self):
yield core._io_queue.queue_read(self)
self._flag = 0


except ImportError:
pass
126 changes: 87 additions & 39 deletions asyncio/funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# SPDX-License-Identifier: MIT
#
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019-2020 Damien P. George
# MIT license; Copyright (c) 2019-2022 Damien P. George
#
# This code comes from MicroPython, and has not been run through black or pylint there.
# Altering these files significantly would make merging difficult, so we will not use
Expand All @@ -19,6 +19,22 @@
from . import core


async def _run(waiter, aw):
try:
result = await aw
status = True
except BaseException as er:
result = None
status = er
if waiter.data is None:
# The waiter is still waiting, cancel it.
if waiter.cancel():
# Waiter was cancelled by us, change its CancelledError to an instance of
# CancelledError that contains the status and result of waiting on aw.
# If the wait_for task subsequently gets cancelled externally then this
# instance will be reset to a CancelledError instance without arguments.
waiter.data = core.CancelledError(status, result)

async def wait_for(aw, timeout, sleep=core.sleep):
"""Wait for the *aw* awaitable to complete, but cancel if it takes longer
than *timeout* seconds. If *aw* is not a task then a task will be created
Expand All @@ -36,41 +52,26 @@ async def wait_for(aw, timeout, sleep=core.sleep):
if timeout is None:
return await aw

async def runner(waiter, aw):
nonlocal status, result
try:
result = await aw
s = True
except BaseException as er:
s = er
if status is None:
# The waiter is still waiting, set status for it and cancel it.
status = s
waiter.cancel()

# Run aw in a separate runner task that manages its exceptions.
status = None
result = None
runner_task = core.create_task(runner(core.cur_task, aw))
runner_task = core.create_task(_run(core.cur_task, aw))

try:
# Wait for the timeout to elapse.
await sleep(timeout)
except core.CancelledError as er:
if status is True:
# aw completed successfully and cancelled the sleep, so return aw's result.
return result
elif status is None:
status = er.args[0] if er.args else None
if status is None:
# This wait_for was cancelled externally, so cancel aw and re-raise.
status = True
runner_task.cancel()
raise er
elif status is True:
# aw completed successfully and cancelled the sleep, so return aw's result.
return er.args[1]
else:
# aw raised an exception, propagate it out to the caller.
raise status

# The sleep finished before aw, so cancel aw and raise TimeoutError.
status = True
runner_task.cancel()
await runner_task
raise core.TimeoutError
Expand All @@ -85,30 +86,77 @@ def wait_for_ms(aw, timeout):
return wait_for(aw, timeout, core.sleep_ms)


class _Remove:
@staticmethod
def remove(t):
pass


async def gather(*aws, return_exceptions=False):
"""Run all *aws* awaitables concurrently. Any *aws* that are not tasks
are promoted to tasks.

Returns a list of return values of all *aws*

This is a coroutine.
"""
def done(t, er):
# Sub-task "t" has finished, with exception "er".
nonlocal state
if gather_task.data is not _Remove:
# The main gather task has already been scheduled, so do nothing.
# This happens if another sub-task already raised an exception and
# woke the main gather task (via this done function), or if the main
# gather task was cancelled externally.
return
elif not return_exceptions and not isinstance(er, StopIteration):
# A sub-task raised an exception, indicate that to the gather task.
state = er
else:
state -= 1
if state:
# Still some sub-tasks running.
return
# Gather waiting is done, schedule the main gather task.
core._task_queue.push_head(gather_task)

ts = [core._promote_to_task(aw) for aw in aws]
for i in range(len(ts)):
try:
# TODO handle cancel of gather itself
# if ts[i].coro:
# iter(ts[i]).waiting.push_head(cur_task)
# try:
# yield
# except CancelledError as er:
# # cancel all waiting tasks
# raise er
ts[i] = await ts[i]
except (core.CancelledError, Exception) as er:
if return_exceptions:
ts[i] = er
else:
raise er
if ts[i].state is not True:
# Task is not running, gather not currently supported for this case.
raise RuntimeError("can't gather")
# Register the callback to call when the task is done.
ts[i].state = done

# Set the state for execution of the gather.
gather_task = core.cur_task
state = len(ts)
cancel_all = False

# Wait for the a sub-task to need attention.
gather_task.data = _Remove
try:
await core._never()
except core.CancelledError as er:
cancel_all = True
state = er

# Clean up tasks.
for i in range(len(ts)):
if ts[i].state is done:
# Sub-task is still running, deregister the callback and cancel if needed.
ts[i].state = True
if cancel_all:
ts[i].cancel()
elif isinstance(ts[i].data, StopIteration):
# Sub-task ran to completion, get its return value.
ts[i] = ts[i].data.value
else:
# Sub-task had an exception with return_exceptions==True, so get its exception.
ts[i] = ts[i].data

# Either this gather was cancelled, or one of the sub-tasks raised an exception with
# return_exceptions==False, so reraise the exception here.
if state is not 0:
raise state

# Return the list of return values of each sub-task.
return ts
4 changes: 1 addition & 3 deletions asyncio/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,7 @@ async def open_connection(host, port):
from uerrno import EINPROGRESS
import usocket as socket

ai = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)[
0
] # TODO this is blocking!
ai = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)[0] # TODO this is blocking!
s = socket.socket(ai[0], ai[1], ai[2])
s.setblocking(False)
ss = Stream(s)
Expand Down
41 changes: 26 additions & 15 deletions asyncio/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,24 +113,27 @@ def __init__(self):
def peek(self):
return self.heap

def push_sorted(self, v, key):
def push(self, v, key=None):
assert v.ph_child is None
assert v.ph_next is None
v.data = None
v.ph_key = key
v.ph_child = None
v.ph_next = None
v.ph_key = key if key is not None else core.ticks()
self.heap = ph_meld(v, self.heap)

def push_head(self, v):
self.push_sorted(v, core.ticks())

def pop_head(self):
def pop(self):
v = self.heap
self.heap = ph_pairing(self.heap.ph_child)
assert v.ph_next is None
self.heap = ph_pairing(v.ph_child)
v.ph_child = None
return v

def remove(self, v):
self.heap = ph_delete(self.heap, v)

# Compatibility aliases, remove after they are no longer used
push_head = push
push_sorted = push
pop_head = pop

# Task class representing a coroutine, can be waited on and cancelled.
class Task:
Expand All @@ -144,7 +147,7 @@ class Task:
def __init__(self, coro, globals=None):
self.coro = coro # Coroutine of this Task
self.data = None # General data for queue it is waiting on
self.state = True # None, False, True or a TaskQueue instance
self.state = True # None, False, True, a callable, or a TaskQueue instance
self.ph_key = 0 # Pairing heap
self.ph_child = None # Paring heap
self.ph_child_last = None # Paring heap
Expand All @@ -158,17 +161,25 @@ def __iter__(self):
elif self.state is True:
# Allocated head of linked list of Tasks waiting on completion of this task.
self.state = TaskQueue()
elif type(self.state) is not TaskQueue:
# Task has state used for another purpose, so can't also wait on it.
raise RuntimeError("can't wait")
return self

# CircuitPython needs __await()__.
__await__ = __iter__

def __next__(self):
if not self.state:
# Task finished, raise return value to caller so it can continue.
raise self.data
if self.data is None:
# Task finished but has already been sent to the loop's exception handler.
raise StopIteration
else:
# Task finished, raise return value to caller so it can continue.
raise self.data
else:
# Put calling task on waiting queue.
self.state.push_head(core.cur_task)
self.state.push(core.cur_task)
# Set calling task's data to this task that it waits on, to double-link it.
core.cur_task.data = self

Expand All @@ -195,10 +206,10 @@ def cancel(self):
if hasattr(self.data, "remove"):
# Not on the main running queue, remove the task from the queue it's on.
self.data.remove(self)
core._task_queue.push_head(self)
core._task_queue.push(self)
elif core.ticks_diff(self.ph_key, core.ticks()) > 0:
# On the main running queue but scheduled in the future, so bring it forward to now.
core._task_queue.remove(self)
core._task_queue.push_head(self)
core._task_queue.push(self)
self.data = core.CancelledError
return True