Skip to content

Latest commit

 

History

History
45 lines (36 loc) · 1.6 KB

File metadata and controls

45 lines (36 loc) · 1.6 KB

highstep

A sandbox for exploring how to derive properties from immutable event sequences. Not intended for production use.

Events are processed through composable aggregators that incrementally build up state. Each aggregator declares the fields it reads (deps) and writes (outputs), and the framework handles dependency ordering, field isolation, and validation.

from highstep import Event, Pipeline, SetClosed, SetOpen, emit, problems, aggregator
from highstep.event import Problem
from highstep.field import Field
from highstep.state import StateView
from highstep.aggregators import AggregatorResult
from enum import StrEnum

class Status(StrEnum):
    OPEN = "open"
    CLOSED = "closed"

STATUS = Field[Status]("status", default=Status.OPEN)

@aggregator(outputs=(STATUS,))
def track_status(state: StateView, event: Event) -> AggregatorResult:
    match (event.payload, state[STATUS]):
        case (SetClosed(), Status.CLOSED):
            return problems(Problem(event.id, "already closed"))
        case (SetClosed(), Status.OPEN):
            return emit(STATUS.value(Status.CLOSED))
        case (SetOpen(), Status.OPEN):
            return problems(Problem(event.id, "already open"))
        case (SetOpen(), Status.CLOSED):
            return emit(STATUS.value(Status.OPEN))
    return None

pipeline = Pipeline([track_status])

TODO

  • Couple Fields to their Aggregator directly, so that a Field knows how to produce itself and dependency resolution is implicit (e.g. Pipeline([SPICY]) automatically pulls in TRICKY, STATUS, etc.)
  • Event serialization/deserialization via Payload base class