Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions expression/collections/seq.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def delay(generator: Callable[[], Iterable[_TSource]]) -> Iterable[_TSource]:
return delay(generator)

@staticmethod
def empty() -> Seq[Any]:
def empty() -> Seq[_TSource]:
"""Returns empty sequence."""
return Seq()

Expand Down Expand Up @@ -363,7 +363,8 @@ def zip(self, other: Iterable[_TResult]) -> Iterable[tuple[_TSource, _TResult]]:

def __iter__(self) -> Iterator[_TSource]:
"""Return iterator for sequence."""
return builtins.iter(self._value)
# Make sure we return a proper generator that can handle send, throw, and close
return (x for x in self._value)

def __repr__(self) -> str:
result = "["
Expand Down Expand Up @@ -397,7 +398,8 @@ def __init__(self, gen: Callable[[], Iterable[_TSource]]) -> None:

def __iter__(self) -> Iterator[_TSource]:
xs = self.gen()
return builtins.iter(xs)
# Make sure we return a proper generator that can handle send, throw, and close
return (x for x in xs)


def append(
Expand Down
145 changes: 58 additions & 87 deletions expression/core/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,147 +8,118 @@
from .error import EffectError


_TInner = TypeVar("_TInner")
_TOuter = TypeVar("_TOuter")
_T = TypeVar("_T") # for value type
_M = TypeVar("_M") # for monadic type
_P = ParamSpec("_P")


class Builder(Generic[_TInner, _TOuter], ABC):
class BuilderState(Generic[_T]):
"""Encapsulates the state of a builder computation."""

def __init__(self):
self.is_done = False


class Builder(Generic[_T, _M], ABC): # Corrected Generic definition
"""Effect builder."""

def bind(self, xs: _TOuter, fn: Callable[[Any], _TOuter]) -> _TOuter:
raise NotImplementedError("Builder does not implement a bind method")
# Required methods
def bind(self, xs: _M, fn: Callable[[_T], _M]) -> _M: # Use concrete types for Callable input and output
raise NotImplementedError("Builder does not implement a `bind` method")

def return_(self, x: _TInner) -> _TOuter:
raise NotImplementedError("Builder does not implement a return method")
def return_(self, x: _T) -> _M:
raise NotImplementedError("Builder does not implement a `return` method")

def return_from(self, xs: _TOuter) -> _TOuter:
raise NotImplementedError("Builder does not implement a return from method")
def return_from(self, xs: _M) -> _M:
raise NotImplementedError("Builder does not implement a `return` from method")

def combine(self, xs: _TOuter, ys: _TOuter) -> _TOuter:
def combine(self, xs: _M, ys: _M) -> _M:
"""Used for combining multiple statements in the effect."""
raise NotImplementedError("Builder does not implement a combine method")
raise NotImplementedError("Builder does not implement a `combine` method")

def zero(self) -> _TOuter:
def zero(self) -> _M:
"""Zero effect.

Called if the effect raises StopIteration without a value, i.e
returns None.
"""
raise NotImplementedError("Builder does not implement a zero method")
raise NotImplementedError("Builder does not implement a `zero` method")

def delay(self, fn: Callable[[], _TOuter]) -> _TOuter:
# Optional methods for control flow
def delay(self, fn: Callable[[], _M]) -> _M:
"""Delay the computation.

In F# computation expressions, delay wraps the entire computation to ensure
it is not evaluated until run. This enables proper sequencing of effects
and lazy evaluation.

Args:
fn: The computation to delay

Returns:
The delayed computation
Default implementation is to return the result of the function.
"""
return fn()

def run(self, computation: _TOuter) -> _TOuter:
def run(self, computation: _M) -> _M:
"""Run a computation.

Forces evaluation of a delayed computation. In F# computation expressions,
run is called at the end to evaluate the entire computation that was
wrapped in delay.

Args:
computation: The computation to run

Returns:
The evaluated result
Default implementation is to return the computation as is.
"""
return computation

# Internal implementation
def _send(
self,
gen: Generator[Any, Any, Any],
done: list[bool],
value: _TInner | None = None,
) -> _TOuter:
state: BuilderState[_T], # Use BuilderState
value: _T,
) -> _M:
try:
yielded = gen.send(value)
return self.return_(yielded)
except EffectError as error:
# Effect errors (Nothing, Error, etc) short circuits the processing so we
# set `done` to `True` here.
done.append(True)
# get value from exception
value = error.args[0]
return self.return_from(cast("_TOuter", value))
# Effect errors (Nothing, Error, etc) short circuits
state.is_done = True
return self.return_from(cast("_M", error.args[0]))
except StopIteration as ex:
done.append(True)
state.is_done = True

# Return of a value in the generator produces StopIteration with a value
if ex.value is not None:
return self.return_(ex.value)
raise

raise # Raise StopIteration with no value

except RuntimeError:
done.append(True)
raise StopIteration
state.is_done = True
return self.zero() # Return zero() to handle generator runtime errors instead of raising StopIteration

def __call__(
self,
fn: Callable[
_P,
Generator[_TInner | None, _TInner, _TInner | None] | Generator[_TInner | None, None, _TInner | None],
Generator[_T | None, _T, _T | None] | Generator[_T | None, None, _T | None],
],
) -> Callable[_P, _TOuter]:
"""Option builder.

Enables the use of computational expressions using coroutines.
Thus inside the coroutine the keywords `yield` and `yield from`
reassembles `yield` and `yield!` from F#.

Args:
fn: A function that contains a computational expression and
returns either a coroutine, generator or an option.

Returns:
A `builder` function that can wrap coroutines into builders.
"""
) -> Callable[_P, _M]:
"""The builder decorator."""

@wraps(fn)
def wrapper(*args: _P.args, **kw: _P.kwargs) -> _TOuter:
def wrapper(*args: _P.args, **kw: _P.kwargs) -> _M:
gen = fn(*args, **kw)
done: list[bool] = []

result: _TOuter | None = None
state = BuilderState[_T]() # Initialize BuilderState
result: _M = self.zero() # Initialize result
value: _M

def binder(value: Any) -> _TOuter:
ret = self._send(gen, done, value)

# Delay every result except the first
if result is not None:
return self.delay(lambda: ret)
return ret
def binder(value: Any) -> _M:
ret = self._send(gen, state, value) # Pass state to _send
return self.delay(lambda: ret) # Delay every bind call

try:
result = self._send(gen, done)
# Initialize co-routine with None to start the generator and get the
# first value
result = value = binder(None)

while not done:
cont = self.bind(result, binder)
while not state.is_done: # Loop until coroutine is exhausted
value: _M = self.bind(value, binder) # Send value to coroutine
result = self.combine(result, value) # Combine previous result with new value

# Combine every result except the first
if result is None:
result = cont
else:
result = self.combine(result, cont)
except StopIteration:
# This will happens if the generator exits by returning None
pass

# If anything returns `None` (i.e raises StopIteration without a value) then
# we expect the effect to have a zero method implemented.
if result is None:
result = self.zero()

# Run the computation at the end
return self.run(result)
return self.run(result) # Run the result

return wrapper
5 changes: 4 additions & 1 deletion expression/effect/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
from .option import OptionBuilder as option
from .result import ResultBuilder as result
from .result import TryBuilder as try_
from .seq import SeqBuilder as seq
from .seq import SeqBuilder as seq_builder


seq = seq_builder


__all__ = ["option", "result", "seq", "try_"]
62 changes: 58 additions & 4 deletions expression/effect/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,75 @@ class ResultBuilder(Builder[_TSource, Result[Any, _TError]]):
def bind(
self,
xs: Result[_TSource, _TError],
fn: Callable[[_TSource], Result[_TResult, _TError]],
fn: Callable[[Any], Result[_TResult, _TError]],
) -> Result[_TResult, _TError]:
"""Bind a function to a result value.

In F# computation expressions, this corresponds to ``let!`` and enables
sequencing of computations.

Args:
xs: The result value to bind
fn: The function to apply to the value if Ok

Returns:
The result of applying fn to the value if Ok, otherwise Error
"""
return pipe(xs, result.bind(fn))

def return_(self, x: _TSource) -> Result[_TSource, _TError]:
def return_(self, x: _TSource) -> Result[_TSource, _TError]: # Use Any for return_ type
"""Wrap a value in a result.

In F# computation expressions, this corresponds to ``return`` and lifts
a value into the result context.

Args:
x: The value to wrap

Returns:
Ok containing the value
"""
return Ok(x)

def return_from(self, xs: Result[_TSource, _TError]) -> Result[_TSource, _TError]:
"""Return a result value directly.

In F# computation expressions, this corresponds to ``return!`` and allows
returning an already wrapped value.

Args:
xs: The result value to return

Returns:
The result value unchanged
"""
return xs

def combine(self, xs: Result[_TSource, _TError], ys: Result[_TSource, _TError]) -> Result[_TSource, _TError]:
"""Combine two result computations.

In F# computation expressions, this enables sequencing multiple
expressions where we only care about the final result.

Args:
xs: First result computation
ys: Second result computation

Returns:
The second computation if first is Ok, otherwise Error
"""
return xs.bind(lambda _: ys)

def zero(self) -> Result[_TSource, _TError]:
raise NotImplementedError
def zero(self) -> Result[Any, _TError]: # Use Any for zero return type
"""Return the zero value for results.

In F# computation expressions, this is used when no value is returned,
corresponding to Ok(()) in F#.

Returns:
Ok(None)
"""
return Ok(None)

def __call__(
self, # Ignored self parameter
Expand Down
Loading