-
Notifications
You must be signed in to change notification settings - Fork 0
Open
Description
Currently we do not have a state machine to reflect the playbook running at any given time.
This is why the metrics observer is kind of clunky maybe using a context manager could help in that region.
Current Context objects:
class PhaseContext:
"""Tracks phase-level context."""
id: str
name: str
start_time: datetime
step_ids: Set[str] = field(default_factory=set)
class StepContext:
"""Tracks step-level context."""
id: str
step_index: int
session: str
start_time: datetime
phase_id: str
request_ids: Set[str] = field(default_factory=set)
class RequestContext:
"""Tracks request-level context."""
id: str
method: str
endpoint: str
start_time: datetime
step_id: str
These Context objects are more about data and relationships while the state machine is about execution flow and transitions.
Here's how I would organize them to work together:
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, Set, Dict, Any
from enum import Enum
class ExecutionState(Enum):
NOT_STARTED = "not_started"
INITIALIZING = "initializing"
RUNNING = "running"
CHECKPOINTING = "checkpointing"
RESUMING = "resuming"
PARALLEL_EXECUTION = "parallel_execution"
SEQUENTIAL_EXECUTION = "sequential_execution"
CANCELLING = "cancelling"
FAILED = "failed"
COMPLETED = "completed"
@dataclass
class RequestContext:
"""Tracks request-level data"""
id: str
method: str
endpoint: str
start_time: datetime
step_id: str
@dataclass
class StepContext:
"""Tracks step-level data"""
id: str
step_index: int
session: str
start_time: datetime
phase_id: str
request_ids: Set[str] = field(default_factory=set)
config: Any = None # StepConfig
@dataclass
class PhaseContext:
"""Tracks phase-level data"""
id: str
name: str
start_time: datetime
step_ids: Set[str] = field(default_factory=set)
config: Any = None # PhaseConfig
@dataclass
class ExecutionContext:
"""Holds all execution context data"""
# Current contexts
current_phase: Optional[PhaseContext] = None
current_step: Optional[StepContext] = None
current_request: Optional[RequestContext] = None
# Tracking collections
phases: Dict[str, PhaseContext] = field(default_factory=dict)
steps: Dict[str, StepContext] = field(default_factory=dict)
requests: Dict[str, RequestContext] = field(default_factory=dict)
# Variables
variables: Dict[str, Any] = field(default_factory=dict)
class ExecutionStateManager:
"""Manages both state transitions and context data"""
def __init__(self, logger: BaseLogger):
self.logger = logger
self._state = ExecutionState.NOT_STARTED
self._context = ExecutionContext()
self._running_requests: List[asyncio.Task] = []
@property
def state(self) -> ExecutionState:
return self._state
@property
def context(self) -> ExecutionContext:
return self._context
def begin_phase(self, phase_config: Any) -> PhaseContext:
"""Begin execution of a new phase."""
if self._state not in (ExecutionState.INITIALIZING, ExecutionState.RUNNING):
raise ValueError(f"Cannot begin phase in state {self._state}")
# Create phase context
phase_context = PhaseContext(
id=f"phase-{len(self._context.phases)}",
name=phase_config.name,
start_time=datetime.now(),
config=phase_config
)
# Update state and context
self._state = ExecutionState.RUNNING
self._context.current_phase = phase_context
self._context.phases[phase_context.id] = phase_context
self.logger.log_info(f"Beginning phase: {phase_context.name}")
return phase_context
def begin_step(self, step_config: Any, phase_context: PhaseContext) -> StepContext:
"""Begin execution of a new step."""
if self._state not in (ExecutionState.SEQUENTIAL_EXECUTION, ExecutionState.PARALLEL_EXECUTION):
raise ValueError(f"Cannot begin step in state {self._state}")
# Create step context
step_context = StepContext(
id=f"step-{len(self._context.steps)}",
step_index=len(phase_context.step_ids),
session=step_config.session,
start_time=datetime.now(),
phase_id=phase_context.id,
config=step_config
)
# Update contexts
phase_context.step_ids.add(step_context.id)
self._context.current_step = step_context
self._context.steps[step_context.id] = step_context
self.logger.log_info(f"Beginning step {step_context.step_index} in phase {phase_context.name}")
return step_context
def begin_request(self, method: str, endpoint: str, step_context: StepContext) -> RequestContext:
"""Begin execution of a new request."""
if self._state not in (ExecutionState.SEQUENTIAL_EXECUTION, ExecutionState.PARALLEL_EXECUTION):
raise ValueError(f"Cannot begin request in state {self._state}")
# Create request context
request_context = RequestContext(
id=f"request-{len(self._context.requests)}",
method=method,
endpoint=endpoint,
start_time=datetime.now(),
step_id=step_context.id
)
# Update contexts
step_context.request_ids.add(request_context.id)
self._context.current_request = request_context
self._context.requests[request_context.id] = request_context
self.logger.log_info(f"Beginning request {method} {endpoint}")
return request_context
Usage in the Playbook:
async def _execute_step(self, step_config: StepConfig, phase_context_id: str, step_index: int) -> None:
# Get the current phase context
phase_context = self.state_manager.context.phases[phase_context_id]
# Begin the step and get its context
step_context = self.state_manager.begin_step(step_config, phase_context)
# Get session and create request context
session = self.session_manager.get_session(step_config.session)
request_context = self.state_manager.begin_request(
step_config.request.method,
step_config.request.endpoint,
step_context
)
# Execute the request with full context
task = asyncio.create_task(
self._execute_single_step(step_context, request_context)
)
self.state_manager.track_request(task)
try:
await task
self.state_manager.advance_step()
finally:
self.state_manager.untrack_request(task)
The key differences and relationships:
-
State Machine: Handles the "when" and "what's allowed"
- Controls execution flow
- Validates transitions
- Ensures operations happen in correct order
-
Context Objects: Handle the "what" and "who"
- Store data about each execution unit
- Track relationships between phases/steps/requests
- Maintain execution history
- Used by observers and metrics
-
Working Together:
- State machine validates if we can start a new phase/step/request
- Context objects store the data about that phase/step/request
- Observers can access both state and context
- Metrics can be collected with full context
- Error handling has complete information
Metadata
Metadata
Assignees
Labels
No labels