Skip to content

Commit a25847d

Browse files
authored
Async Result builder (#247)
* Async results * Add docs. Fix type errors * Add async_option * Add seq concat and update seq collect implemenation
1 parent a39d985 commit a25847d

File tree

10 files changed

+1410
-5
lines changed

10 files changed

+1410
-5
lines changed

README.py

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,11 @@
142142
is amazing stuff.
143143
- **option** - an optional world for working with optional values.
144144
- **result** - an error handling world for working with result values.
145+
- **seq** - a world for working with sequences.
146+
- **async_result** - an asynchronous error handling world for working
147+
with asynchronous result values.
148+
- **async_option** - an asynchronous optional world for working with
149+
asynchronous optional values.
145150
- **Mailbox Processor**: for lock free programming using the [Actor
146151
model](https://en.wikipedia.org/wiki/Actor_model).
147152
- **Cancellation Token**: for cancellation of asynchronous (and
@@ -461,6 +466,145 @@ def fn5() -> Generator[int, int, int]:
461466
pinned to `Exception` i.e., `Result[TSource, Exception]`.
462467
"""
463468

469+
# %% [markdown]
470+
"""
471+
### AsyncResult
472+
473+
The `AsyncResult[T, TError]` type is the asynchronous version of `Result`. It allows you
474+
to compose asynchronous operations that may fail, using the Result type. This is
475+
particularly useful for handling errors in asynchronous code, such as API calls,
476+
database operations, or any other I/O-bound tasks.
477+
478+
Similar to the `Result` effect, AsyncResult enables "railway oriented programming" but
479+
for asynchronous operations. If any part of the function yields an `Error`, the function
480+
is short-circuited and the following statements will never be executed.
481+
"""
482+
483+
# %%
484+
from collections.abc import AsyncGenerator
485+
486+
from expression import Error, Ok, effect
487+
488+
489+
@effect.async_result[int, str]()
490+
async def fn() -> AsyncGenerator[int, int]:
491+
x: int = yield 42 # Regular value
492+
y: int = yield await Ok(43) # Awaitable Ok value
493+
494+
# Short-circuit if condition is met
495+
if x + y > 80:
496+
z: int = yield await Error("Value too large") # This will short-circuit
497+
else:
498+
z: int = yield 44
499+
500+
yield x + y + z # Final value
501+
502+
503+
# This would be run in an async context
504+
# result = await fn()
505+
# assert result == Error("Value too large")
506+
507+
# %% [markdown]
508+
"""
509+
AsyncResult works well with other async functions and can be nested:
510+
"""
511+
512+
513+
# %%
514+
@effect.async_result[int, str]()
515+
async def inner(x: int) -> AsyncGenerator[int, int]:
516+
y: int = yield x + 1
517+
yield y + 1 # Final value is y + 1
518+
519+
520+
@effect.async_result[int, str]()
521+
async def outer() -> AsyncGenerator[int, int]:
522+
x: int = yield 40
523+
524+
# Call inner and await its result
525+
inner_result = await inner(x)
526+
y: int = yield await inner_result
527+
528+
yield y # Final value is y
529+
530+
531+
# This would be run in an async context
532+
# result = await outer()
533+
# assert result == Ok(42) # 40 -> 41 -> 42
534+
535+
# %% [markdown]
536+
"""
537+
A simplified type called `AsyncTry` is also available. It's an async result type that is
538+
pinned to `Exception` i.e., `AsyncResult[TSource, Exception]`.
539+
"""
540+
541+
# %% [markdown]
542+
"""
543+
### AsyncOption
544+
545+
The `AsyncOption[T]` type is the asynchronous version of `Option`. It allows you to
546+
compose asynchronous operations that may return an optional value, using the Option type.
547+
This is particularly useful for handling optional values in asynchronous code, such as
548+
API calls that might not return a value, database queries that might not find a record,
549+
or any other I/O-bound tasks that might not produce a meaningful result.
550+
551+
Similar to the `Option` effect, AsyncOption enables short-circuiting but for asynchronous
552+
operations. If any part of the function yields `Nothing`, the function is short-circuited
553+
and the following statements will never be executed.
554+
"""
555+
556+
# %%
557+
from collections.abc import AsyncGenerator
558+
559+
from expression import Nothing, Some, effect
560+
561+
562+
@effect.async_option[int]()
563+
async def fn_option() -> AsyncGenerator[int, int]:
564+
x: int = yield 42 # Regular value
565+
y: int = yield await Some(43) # Awaitable Some value
566+
567+
# Short-circuit if condition is met
568+
if x + y > 80:
569+
z: int = yield await Nothing # This will short-circuit
570+
else:
571+
z: int = yield 44
572+
573+
yield x + y + z # Final value
574+
575+
576+
# This would be run in an async context
577+
# result = await fn_option()
578+
# assert result is Nothing
579+
580+
# %% [markdown]
581+
"""
582+
AsyncOption works well with other async functions and can be nested:
583+
"""
584+
585+
586+
# %%
587+
@effect.async_option[int]()
588+
async def inner_option(x: int) -> AsyncGenerator[int, int]:
589+
y: int = yield x + 1
590+
yield y + 1 # Final value is y + 1
591+
592+
593+
@effect.async_option[int]()
594+
async def outer_option() -> AsyncGenerator[int, int]:
595+
x: int = yield 40
596+
597+
# Call inner and await its result
598+
inner_result = await inner_option(x)
599+
y: int = yield await inner_result
600+
601+
yield y # Final value is y
602+
603+
604+
# This would be run in an async context
605+
# result = await outer_option()
606+
# assert result == Some(42) # 40 -> 41 -> 42
607+
464608
# %% [markdown]
465609
"""
466610
### Sequence

expression/collections/seq.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,14 @@ def choose(self, chooser: Callable[[_TSource], Option[_TResult]]) -> Seq[_TResul
109109
xs = pipe(self, choose(chooser))
110110
return Seq(xs)
111111

112+
def concat(self: Seq[Seq[_TResult]]) -> Seq[_TResult]:
113+
"""Concatenate sequences.
114+
115+
Combines the given variable number of enumerations and/or
116+
enumeration-of-enumerations as a single concatenated enumeration.
117+
"""
118+
return Seq(concat(*self))
119+
112120
def collect(self, mapping: Callable[[_TSource], Seq[_TResult]]) -> Seq[_TResult]:
113121
"""Collect items from the sequence.
114122
@@ -123,8 +131,7 @@ def collect(self, mapping: Callable[[_TSource], Seq[_TResult]]) -> Seq[_TResult]
123131
A sequence comprising the concatenated values from the mapping
124132
function.
125133
"""
126-
xs = pipe(self, collect(mapping))
127-
return Seq(xs)
134+
return self.map(mapping).concat()
128135

129136
@staticmethod
130137
def delay(generator: Callable[[], Iterable[_TSource]]) -> Iterable[_TSource]:

expression/core/async_builder.py

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
"""Async builder module.
2+
3+
This module provides the base class for async builders, which are used to
4+
create computational expressions for async operations.
5+
"""
6+
7+
from abc import ABC
8+
from collections.abc import AsyncGenerator, Awaitable, Callable
9+
from functools import wraps
10+
from typing import Any, Generic, TypeVar, cast
11+
12+
from typing_extensions import ParamSpec
13+
14+
from .error import EffectError
15+
16+
17+
_T = TypeVar("_T") # The container item type
18+
_M = TypeVar("_M") # for container type
19+
_P = ParamSpec("_P")
20+
21+
22+
class AsyncBuilderState(Generic[_T]):
23+
"""Encapsulates the state of an async builder computation."""
24+
25+
def __init__(self):
26+
self.is_done = False
27+
28+
29+
class AsyncBuilder(Generic[_T, _M], ABC): # Corrected Generic definition
30+
"""Async effect builder."""
31+
32+
# Required methods
33+
async def bind(
34+
self, xs: _M, fn: Callable[[_T], Awaitable[_M]]
35+
) -> _M: # Use concrete types for Callable input and output
36+
raise NotImplementedError("AsyncBuilder does not implement a `bind` method")
37+
38+
async def return_(self, x: _T) -> _M:
39+
raise NotImplementedError("AsyncBuilder does not implement a `return` method")
40+
41+
async def return_from(self, xs: _M) -> _M:
42+
raise NotImplementedError("AsyncBuilder does not implement a `return` from method")
43+
44+
async def combine(self, xs: _M, ys: _M) -> _M:
45+
"""Used for combining multiple statements in the effect."""
46+
raise NotImplementedError("AsyncBuilder does not implement a `combine` method")
47+
48+
async def zero(self) -> _M:
49+
"""Zero effect.
50+
51+
Called if the effect raises StopAsyncIteration without a value, i.e
52+
returns None.
53+
"""
54+
raise NotImplementedError("AsyncBuilder does not implement a `zero` method")
55+
56+
# Optional methods for control flow
57+
async def delay(self, fn: Callable[[], _M]) -> _M:
58+
"""Delay the computation.
59+
60+
Default implementation is to return the result of the function.
61+
"""
62+
return fn()
63+
64+
async def run(self, computation: _M) -> _M:
65+
"""Run a computation.
66+
67+
Default implementation is to return the computation as is.
68+
"""
69+
return computation
70+
71+
# Internal implementation
72+
async def _send(
73+
self,
74+
gen: AsyncGenerator[_T, Any],
75+
state: AsyncBuilderState[_T], # Use AsyncBuilderState
76+
value: _T,
77+
) -> _M:
78+
try:
79+
yielded = await gen.asend(value)
80+
return await self.return_(yielded)
81+
except EffectError as error:
82+
# Effect errors (Nothing, Error, etc) short circuits
83+
state.is_done = True
84+
return await self.return_from(cast("_M", error.args[0]))
85+
except StopAsyncIteration:
86+
state.is_done = True
87+
raise
88+
except Exception:
89+
state.is_done = True
90+
raise
91+
92+
def __call__(
93+
self,
94+
fn: Callable[
95+
_P,
96+
AsyncGenerator[_T, Any],
97+
],
98+
) -> Callable[_P, Awaitable[_M]]:
99+
"""The builder decorator."""
100+
101+
@wraps(fn)
102+
async def wrapper(*args: _P.args, **kw: _P.kwargs) -> _M:
103+
gen = fn(*args, **kw)
104+
state = AsyncBuilderState[_T]() # Initialize AsyncBuilderState
105+
result: _M = await self.zero() # Initialize result
106+
value: _M
107+
108+
async def binder(value: Any) -> _M:
109+
ret = await self._send(gen, state, value) # Pass state to _send
110+
return await self.delay(lambda: ret) # Delay every bind call
111+
112+
try:
113+
# Initialize co-routine with None to start the generator and get the
114+
# first value
115+
result = value = await binder(None)
116+
117+
while not state.is_done: # Loop until coroutine is exhausted
118+
value = await self.bind(value, binder) # Send value to coroutine
119+
result = await self.combine(result, value) # Combine previous result with new value
120+
121+
except StopAsyncIteration:
122+
# This will happens if the generator exits by returning None
123+
pass
124+
125+
return await self.run(result) # Run the result
126+
127+
return wrapper
128+
129+
130+
__all__ = ["AsyncBuilder", "AsyncBuilderState"]

expression/core/option.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from __future__ import annotations
1010

1111
import builtins
12-
from collections.abc import Callable, Generator, Iterable
12+
from collections.abc import Awaitable, Callable, Generator, Iterable
1313
from typing import TYPE_CHECKING, Any, Literal, TypeGuard, TypeVar, cast, get_args, get_origin
1414

1515
from typing_extensions import TypeVarTuple, Unpack
@@ -42,6 +42,7 @@
4242
@tagged_union(frozen=True, order=True)
4343
class Option(
4444
Iterable[_TSourceOut],
45+
Awaitable[_TSourceOut],
4546
PipeMixin,
4647
):
4748
"""Option class."""
@@ -296,6 +297,16 @@ def __str__(self) -> str:
296297
def __repr__(self) -> str:
297298
return self.__str__()
298299

300+
def __await__(self) -> Generator[_TSourceOut, _TSourceOut, _TSourceOut]:
301+
"""Make Option awaitable by delegating to __iter__."""
302+
match self:
303+
case Option(tag="some", some=value):
304+
return value
305+
case _:
306+
raise EffectError(self)
307+
308+
yield None
309+
299310
@classmethod
300311
def __get_pydantic_core_schema__(cls, source_type: Any, handler: GetCoreSchemaHandler) -> CoreSchema:
301312
from pydantic import ValidatorFunctionWrapHandler

expression/core/result.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from __future__ import annotations
1313

1414
import builtins
15-
from collections.abc import Callable, Generator, Iterable
15+
from collections.abc import Awaitable, Callable, Generator, Iterable
1616
from typing import (
1717
TYPE_CHECKING,
1818
Any,
@@ -49,6 +49,7 @@
4949
@tagged_union(frozen=True, order=True)
5050
class Result(
5151
Iterable[_TSourceOut],
52+
Awaitable[_TSourceOut],
5253
PipeMixin,
5354
Generic[_TSourceOut, _TErrorOut],
5455
):
@@ -255,6 +256,16 @@ def __iter__(self) -> Generator[_TSourceOut, _TSourceOut, _TSourceOut]:
255256
case _:
256257
raise EffectError(self)
257258

259+
def __await__(self) -> Generator[_TSourceOut, _TSourceOut, _TSourceOut]:
260+
"""Make Result awaitable by delegating to __iter__."""
261+
match self:
262+
case Result(tag="ok", ok=value):
263+
return value
264+
case _:
265+
raise EffectError(self)
266+
267+
yield None
268+
258269
def __str__(self) -> str:
259270
match self:
260271
case Result(tag="ok", ok=value):

expression/effect/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
"""A collection of computational expression effects."""
22

3+
from .async_option import AsyncOptionBuilder as async_option
4+
from .async_result import AsyncResultBuilder as async_result
5+
from .async_result import AsyncTryBuilder as async_try
36
from .option import OptionBuilder as option
47
from .result import ResultBuilder as result
58
from .result import TryBuilder as try_
@@ -9,4 +12,4 @@
912
seq = seq_builder
1013

1114

12-
__all__ = ["option", "result", "seq", "try_"]
15+
__all__ = ["async_option", "async_result", "async_try", "option", "result", "seq", "try_"]

0 commit comments

Comments
 (0)