Skip to content
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@ incremental in minor, bugfixes only are patches.
See [0Ver](https://0ver.org/).


## 0.26.0

### Features

- Add `returns.methods.gather` utility method


## 0.25.0

### Features
Expand Down
38 changes: 32 additions & 6 deletions docs/pages/methods.rst
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,45 @@ Here's a full example:
partition
~~~~~~~~~

:func:`partition <returns.result.partition>` is used to convert
:func:`partition <returns.methods.partition>` is used to convert
list of :class:`~returns.interfaces.Unwrappable`
instances like :class:`~returns.result.Result`,
:class:`~returns.io.IOResult`, and :class:`~returns.maybe.Maybe`
to a tuple of two lists: successes and failures.

.. code:: python

>>> from returns.result import Failure, Success
>>> from returns.methods import partition
>>> results = [Success(1), Failure(2), Success(3), Failure(4)]
>>> partition(results)
([1, 3], [2, 4])
>>> from returns.result import Failure, Success
>>> from returns.methods import partition
>>> results = [Success(1), Failure(2), Success(3), Failure(4)]
>>> partition(results)
([1, 3], [2, 4])

gather
~~~~~~

:func:`gather <returns.methods.gather>` is used to safely concurrently
execute multiple awaitable objects(any object with ``__await__`` method,
included function marked with async keyword) and return a tuple of wrapped results
:class: `~returns.io.IOResult`.
Embrace railway-oriented programming princple of executing as many IO operations
as possible before synchrounous computations.

.. code:: python

>>> import anyio
>>> from returns.io import IO, IOSuccess, IOFailure
>>> from returns.result import Failure, Success
>>> from returns.methods import gather

>>> async def coro():
... return 1
>>> async def coro_raise():
... raise ValueError(2)
>>> anyio.run(gather,[coro(), coro_raise()])
(<IOResult: <Success: 1>>, <IOResult: <Failure: 2>>)



API Reference
-------------
Expand Down
3 changes: 3 additions & 0 deletions returns/methods/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
"""Set of various utility functions."""

from returns.methods.async_ import gather as gather
from returns.methods.cond import cond as cond
from returns.methods.partition import partition as partition
from returns.methods.unwrap_or_failure import (
Expand Down
39 changes: 39 additions & 0 deletions returns/methods/async_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# flake8: noqa: WPS102

from collections.abc import Awaitable, Iterable

import anyio

from returns.io import IOResult


async def gather(
containers: Iterable[Awaitable,],
) -> tuple[IOResult, ...]:
"""
Execute multiple coroutines concurrently and return their wrapped results.

.. code:: python

>>> import anyio
>>> from returns.methods import gather
>>> from returns.io import IOSuccess

>>> async def coro():
... return 1
>>> assert anyio.run(gather, [coro()]) == (IOSuccess(1), )
"""
async with anyio.create_task_group() as tg:
ioresults: dict[int, IOResult] = {}

async def _run_task(coro: Awaitable, index: int): # noqa: WPS430
ioresult: IOResult
try:
ioresult = IOResult.from_value(await coro)
except Exception as exc:
ioresult = IOResult.from_failure(exc)
ioresults[index] = ioresult

for coro_index, coro in enumerate(containers):
tg.start_soon(_run_task, coro, coro_index)
return tuple(ioresults[key] for key in sorted(ioresults.keys()))
43 changes: 43 additions & 0 deletions tests/test_methods/test_gather.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import anyio
import pytest

from returns.future import FutureResult
from returns.io import IOResult
from returns.methods import gather
from returns.result import Result


async def _helper_func1() -> str:
return 'successful function'


async def _helper_func2() -> str:
return 'failed function'


@pytest.mark.parametrize(
('containers', 'expected'),
[
(
(
FutureResult.from_value(1),
FutureResult.from_failure(None),
),
(IOResult.from_value(1), IOResult.from_failure(None)),
),
((), ()),
(
(
_helper_func1(),
_helper_func2(),
),
(
IOResult.from_result(Result.from_value('successful function')),
IOResult.from_result(Result.from_failure('failed function')),
),
),
],
)
def test_gather(containers, expected):
"""Test partition function."""
assert anyio.run(gather, containers) == expected
Loading