From 71962ce4df30bc3fb6445e63a8bde2a99947f6d0 Mon Sep 17 00:00:00 2001 From: Dino Viehland Date: Tue, 3 May 2022 09:21:13 -0700 Subject: [PATCH 1/3] Add basic eager async evaluation to Tasks (Python only) and TaskGroups Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags: --- Lib/asyncio/taskgroups.py | 59 +++++++++++++++ Lib/asyncio/tasks.py | 95 +++++++++++++----------- Lib/test/test_asyncio/test_taskgroups.py | 95 ++++++++++++++++++++++++ 3 files changed, 204 insertions(+), 45 deletions(-) diff --git a/Lib/asyncio/taskgroups.py b/Lib/asyncio/taskgroups.py index 6af21f3a15d93a..13994afcb0241f 100644 --- a/Lib/asyncio/taskgroups.py +++ b/Lib/asyncio/taskgroups.py @@ -8,6 +8,12 @@ from . import events from . import exceptions from . import tasks +from .futures import Future + + +class PyCoroEagerResult: + def __init__(self, value): + self.value = value class TaskGroup: @@ -24,6 +30,7 @@ def __init__(self): self._errors = [] self._base_error = None self._on_completed_fut = None + self._enqueues = {} def __repr__(self): info = [''] @@ -57,6 +64,16 @@ async def __aenter__(self): return self + def eager_eval(self, coro): + try: + fut = coro.send(None) + except StopIteration as e: + return PyCoroEagerResult(e.args[0] if e.args else None) + else: + task = self.create_task(coro) + task._set_fut_awaiter(fut) + return task + async def __aexit__(self, et, exc, tb): self._exiting = True propagate_cancellation_error = None @@ -89,6 +106,23 @@ async def __aexit__(self, et, exc, tb): # self._abort() + if self._enqueues: + for coro in self._enqueues: + res = self.eager_eval(coro) + if isinstance(res, PyCoroEagerResult): + fut = self._enqueues[coro] + if fut is not None: + fut.set_result(res.value) + else: + def queue_callback(): + fut = self._enqueues[coro] + if fut is not None: + res.add_done_callback(lambda task: fut.set_result(task.result())) + queue_callback() + + self._unfinished_tasks -= len(self._enqueues) + self._enqueues.clear() + # We use while-loop here because "self._on_completed_fut" # can be cancelled multiple times if our parent task # is being cancelled repeatedly (or even once, when @@ -153,6 +187,31 @@ def create_task(self, coro, *, name=None, context=None): self._tasks.add(task) return task + def enqueue(self, coro, no_future=True): + if not self._entered: + raise RuntimeError(f"TaskGroup {self!r} has not been entered") + + if coro in self._enqueues: + return self._enqueues[coro] + + if not self._enqueues: + # if the looop starts running because someone awaits, we want + # to run the co-routines which are enqueued as well. + self._loop.call_soon(self._enqueue_enqueus) + + self._unfinished_tasks += 1 + if no_future: + fut = self._enqueues[coro] = None + else: + fut = self._enqueues[coro] = Future(loop=self._loop) + return fut + + def _enqueue_enqueus(self): + for coro in self._enqueues: + self.create_task(coro) + self._unfinished_tasks -= len(self._enqueues) + self._enqueues.clear() + # Since Python 3.8 Tasks propagate all exceptions correctly, # except for KeyboardInterrupt and SystemExit which are # still considered special. diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 3952b5f2a7743d..9ad8c88fd6772f 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -117,7 +117,9 @@ def __init__(self, coro, *, loop=None, name=None, context=None): else: self._context = context - self._loop.call_soon(self.__step, context=self._context) + if not getattr(coro, "cr_suspended", False): + self._loop.call_soon(self.__step, context=self._context) + _register_task(self) def __del__(self): @@ -293,55 +295,58 @@ def __step(self, exc=None): except BaseException as exc: super().set_exception(exc) else: - blocking = getattr(result, '_asyncio_future_blocking', None) - if blocking is not None: - # Yielded Future must come from Future.__iter__(). - if not self._check_future(result): - new_exc = RuntimeError( - f'Task {self!r} got Future ' - f'{result!r} attached to a different loop') - self._loop.call_soon( - self.__step, new_exc, context=self._context) - elif blocking: - if result is self: - new_exc = RuntimeError( - f'Task cannot await on itself: {self!r}') - self._loop.call_soon( - self.__step, new_exc, context=self._context) - else: - result._asyncio_future_blocking = False - result.add_done_callback( - self.__wakeup, context=self._context) - self._fut_waiter = result - if self._must_cancel: - if self._fut_waiter.cancel( - msg=self._cancel_message): - self._must_cancel = False - else: - new_exc = RuntimeError( - f'yield was used instead of yield from ' - f'in task {self!r} with {result!r}') - self._loop.call_soon( - self.__step, new_exc, context=self._context) + self._set_fut_awaiter(result) + finally: + _leave_task(self._loop, self) + self = None # Needed to break cycles when an exception occurs. - elif result is None: - # Bare yield relinquishes control for one event loop iteration. - self._loop.call_soon(self.__step, context=self._context) - elif inspect.isgenerator(result): - # Yielding a generator is just wrong. + def _set_fut_awaiter(self, result): + blocking = getattr(result, '_asyncio_future_blocking', None) + if blocking is not None: + # Yielded Future must come from Future.__iter__(). + if not self._check_future(result): new_exc = RuntimeError( - f'yield was used instead of yield from for ' - f'generator in task {self!r} with {result!r}') + f'Task {self!r} got Future ' + f'{result!r} attached to a different loop') self._loop.call_soon( self.__step, new_exc, context=self._context) + elif blocking: + if result is self: + new_exc = RuntimeError( + f'Task cannot await on itself: {self!r}') + self._loop.call_soon( + self.__step, new_exc, context=self._context) + else: + result._asyncio_future_blocking = False + result.add_done_callback( + self.__wakeup, context=self._context) + self._fut_waiter = result + if self._must_cancel: + if self._fut_waiter.cancel( + msg=self._cancel_message): + self._must_cancel = False else: - # Yielding something else is an error. - new_exc = RuntimeError(f'Task got bad yield: {result!r}') + new_exc = RuntimeError( + f'yield was used instead of yield from ' + f'in task {self!r} with {result!r}') self._loop.call_soon( self.__step, new_exc, context=self._context) - finally: - _leave_task(self._loop, self) - self = None # Needed to break cycles when an exception occurs. + + elif result is None: + # Bare yield relinquishes control for one event loop iteration. + self._loop.call_soon(self.__step, context=self._context) + elif inspect.isgenerator(result): + # Yielding a generator is just wrong. + new_exc = RuntimeError( + f'yield was used instead of yield from for ' + f'generator in task {self!r} with {result!r}') + self._loop.call_soon( + self.__step, new_exc, context=self._context) + else: + # Yielding something else is an error. + new_exc = RuntimeError(f'Task got bad yield: {result!r}') + self._loop.call_soon( + self.__step, new_exc, context=self._context) def __wakeup(self, future): try: @@ -369,8 +374,8 @@ def __wakeup(self, future): pass else: # _CTask is needed for tests. - Task = _CTask = _asyncio.Task - + #Task = _CTask = _asyncio.Task + pass def create_task(coro, *, name=None, context=None): """Schedule the execution of a coroutine object in a spawn task. diff --git a/Lib/test/test_asyncio/test_taskgroups.py b/Lib/test/test_asyncio/test_taskgroups.py index 69369a6100a8fd..999a5583e88ffa 100644 --- a/Lib/test/test_asyncio/test_taskgroups.py +++ b/Lib/test/test_asyncio/test_taskgroups.py @@ -722,6 +722,101 @@ async def coro(val): await t2 self.assertEqual(2, ctx.get(cvar)) + async def test_taskgroup_enqueue_01(self): + + async def foo1(): + await asyncio.sleep(0.1) + return 42 + + async def eager(): + return 11 + + async with taskgroups.TaskGroup() as g: + t1 = g.enqueue(foo1(), no_future=False) + t2 = g.enqueue(eager(), no_future=False) + + self.assertEqual(t1.result(), 42) + self.assertEqual(t2.result(), 11) + + async def test_taskgroup_enqueue_02(self): + + async def foo1(): + return 42 + + async def eager(): + return 11 + + async with taskgroups.TaskGroup() as g: + t1 = g.enqueue(foo1(), no_future=False) + t2 = g.enqueue(eager(), no_future=False) + + self.assertEqual(t1.result(), 42) + self.assertEqual(t2.result(), 11) + + async def test_taskgroup_fanout_task(self): + async def step(i): + if i == 0: + return + async with taskgroups.TaskGroup() as g: + for _ in range(6): + g.create_task(step(i - 1)) + + import time + s = time.perf_counter() + await step(6) + e = time.perf_counter() + print(e-s) + + async def test_taskgroup_fanout_enqueue(self): + async def step(i): + if i == 0: + return + async with taskgroups.TaskGroup() as g: + for _ in range(6): + g.enqueue(step(i - 1)) + + import time + s = time.perf_counter() + await step(6) + e = time.perf_counter() + print(e-s) + + async def test_taskgroup_fanout_enqueue_02(self): + async def intermediate2(i): + return await intermediate(i) + + async def intermediate(i): + async with taskgroups.TaskGroup() as g: + for _ in range(6): + g.enqueue(step(i - 1)) + + async def step(i): + if i == 0: + return + + return await intermediate2(i) + + + import time + s = time.perf_counter() + await step(6) + e = time.perf_counter() + print(e-s) + + + async def test_taskgroup_fanout_enqueue_future(self): + async def step(i): + if i == 0: + return + async with taskgroups.TaskGroup() as g: + for _ in range(6): + g.enqueue(step(i - 1), no_future=False) + + import time + s = time.perf_counter() + await step(6) + e = time.perf_counter() + print(e-s) if __name__ == "__main__": unittest.main() From 78b5e0be800ea3cd3e5e24abb88e561ae9cf07b0 Mon Sep 17 00:00:00 2001 From: Dino Viehland Date: Tue, 3 May 2022 13:30:29 -0700 Subject: [PATCH 2/3] eager evaluate in enqueue Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags: --- Lib/asyncio/taskgroups.py | 69 ++++++++---------------- Lib/test/test_asyncio/test_taskgroups.py | 8 +-- 2 files changed, 26 insertions(+), 51 deletions(-) diff --git a/Lib/asyncio/taskgroups.py b/Lib/asyncio/taskgroups.py index 13994afcb0241f..55447988962f00 100644 --- a/Lib/asyncio/taskgroups.py +++ b/Lib/asyncio/taskgroups.py @@ -30,7 +30,6 @@ def __init__(self): self._errors = [] self._base_error = None self._on_completed_fut = None - self._enqueues = {} def __repr__(self): info = [''] @@ -64,16 +63,6 @@ async def __aenter__(self): return self - def eager_eval(self, coro): - try: - fut = coro.send(None) - except StopIteration as e: - return PyCoroEagerResult(e.args[0] if e.args else None) - else: - task = self.create_task(coro) - task._set_fut_awaiter(fut) - return task - async def __aexit__(self, et, exc, tb): self._exiting = True propagate_cancellation_error = None @@ -106,23 +95,6 @@ async def __aexit__(self, et, exc, tb): # self._abort() - if self._enqueues: - for coro in self._enqueues: - res = self.eager_eval(coro) - if isinstance(res, PyCoroEagerResult): - fut = self._enqueues[coro] - if fut is not None: - fut.set_result(res.value) - else: - def queue_callback(): - fut = self._enqueues[coro] - if fut is not None: - res.add_done_callback(lambda task: fut.set_result(task.result())) - queue_callback() - - self._unfinished_tasks -= len(self._enqueues) - self._enqueues.clear() - # We use while-loop here because "self._on_completed_fut" # can be cancelled multiple times if our parent task # is being cancelled repeatedly (or even once, when @@ -187,30 +159,33 @@ def create_task(self, coro, *, name=None, context=None): self._tasks.add(task) return task + def _eager_eval(self, coro): + try: + fut = coro.send(None) + task = self.create_task(coro) + task._set_fut_awaiter(fut) + return task + except StopIteration as e: + # The co-routine has completed synchronously and we've got + # our result. + return PyCoroEagerResult(e.args[0] if e.args else None) + except Exception as e: + res = Future(loop=self._loop) + res.set_exception(e) + return res + def enqueue(self, coro, no_future=True): if not self._entered: raise RuntimeError(f"TaskGroup {self!r} has not been entered") - if coro in self._enqueues: - return self._enqueues[coro] - - if not self._enqueues: - # if the looop starts running because someone awaits, we want - # to run the co-routines which are enqueued as well. - self._loop.call_soon(self._enqueue_enqueus) - - self._unfinished_tasks += 1 - if no_future: - fut = self._enqueues[coro] = None + res = self._eager_eval(coro) + if isinstance(res, PyCoroEagerResult): + if not no_future: + fut = Future(loop=self._loop) + fut.set_result(res.value) + return fut else: - fut = self._enqueues[coro] = Future(loop=self._loop) - return fut - - def _enqueue_enqueus(self): - for coro in self._enqueues: - self.create_task(coro) - self._unfinished_tasks -= len(self._enqueues) - self._enqueues.clear() + return res # Since Python 3.8 Tasks propagate all exceptions correctly, # except for KeyboardInterrupt and SystemExit which are diff --git a/Lib/test/test_asyncio/test_taskgroups.py b/Lib/test/test_asyncio/test_taskgroups.py index 999a5583e88ffa..dd1df82a9f5300 100644 --- a/Lib/test/test_asyncio/test_taskgroups.py +++ b/Lib/test/test_asyncio/test_taskgroups.py @@ -740,15 +740,15 @@ async def eager(): async def test_taskgroup_enqueue_02(self): - async def foo1(): + async def eager1(): return 42 - async def eager(): + async def eager2(): return 11 async with taskgroups.TaskGroup() as g: - t1 = g.enqueue(foo1(), no_future=False) - t2 = g.enqueue(eager(), no_future=False) + t1 = g.enqueue(eager1(), no_future=False) + t2 = g.enqueue(eager2(), no_future=False) self.assertEqual(t1.result(), 42) self.assertEqual(t2.result(), 11) From ce7e4d450a8cd03054d86912bbd7154906fae849 Mon Sep 17 00:00:00 2001 From: Dino Viehland Date: Tue, 3 May 2022 15:45:03 -0700 Subject: [PATCH 3/3] Let the exceptions propagate Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags: --- Lib/asyncio/taskgroups.py | 22 +++++----------------- Lib/test/test_asyncio/test_taskgroups.py | 10 ++++++++++ 2 files changed, 15 insertions(+), 17 deletions(-) diff --git a/Lib/asyncio/taskgroups.py b/Lib/asyncio/taskgroups.py index 55447988962f00..7cda05f218c524 100644 --- a/Lib/asyncio/taskgroups.py +++ b/Lib/asyncio/taskgroups.py @@ -159,7 +159,10 @@ def create_task(self, coro, *, name=None, context=None): self._tasks.add(task) return task - def _eager_eval(self, coro): + def enqueue(self, coro, no_future=True): + if not self._entered: + raise RuntimeError(f"TaskGroup {self!r} has not been entered") + try: fut = coro.send(None) task = self.create_task(coro) @@ -168,23 +171,8 @@ def _eager_eval(self, coro): except StopIteration as e: # The co-routine has completed synchronously and we've got # our result. - return PyCoroEagerResult(e.args[0] if e.args else None) - except Exception as e: res = Future(loop=self._loop) - res.set_exception(e) - return res - - def enqueue(self, coro, no_future=True): - if not self._entered: - raise RuntimeError(f"TaskGroup {self!r} has not been entered") - - res = self._eager_eval(coro) - if isinstance(res, PyCoroEagerResult): - if not no_future: - fut = Future(loop=self._loop) - fut.set_result(res.value) - return fut - else: + res.set_result(e.args[0] if e.args else None) return res # Since Python 3.8 Tasks propagate all exceptions correctly, diff --git a/Lib/test/test_asyncio/test_taskgroups.py b/Lib/test/test_asyncio/test_taskgroups.py index dd1df82a9f5300..3517c0775f0a5e 100644 --- a/Lib/test/test_asyncio/test_taskgroups.py +++ b/Lib/test/test_asyncio/test_taskgroups.py @@ -753,6 +753,16 @@ async def eager2(): self.assertEqual(t1.result(), 42) self.assertEqual(t2.result(), 11) + async def test_taskgroup_enqueue_exception(self): + async def foo1(): + 1 / 0 + + with self.assertRaises(ExceptionGroup) as cm: + async with taskgroups.TaskGroup() as g: + g.enqueue(foo1()) + + self.assertEqual(get_error_types(cm.exception), {ZeroDivisionError}) + async def test_taskgroup_fanout_task(self): async def step(i): if i == 0: