feat(schedule): operator-registered recurring goals with reproducible firing#1806
Conversation
… firing Closes #1798 Adds an in-project recurring-goals surface so operators do not depend on host-level systemd / cron / a cloud scheduler. A fire is a pure function of (schedule_id, fire_time, last_state) and lands on a byte-identical task graph across hosts; each fire chains into the existing AuditLog with event_type schedule.fire. Modules - src/bernstein/core/planning/schedule_store.py: schedule CRUD over .sdd/runtime/schedules/<id>.json plus a self-contained 5-field cron parser (no new runtime dep). - src/bernstein/core/orchestration/schedule_projection.py: pure deterministic projection; no wall-clock, no random, no host-dependent ordering. fire_time is pinned to int so sub-second drift cannot fork two operators. - src/bernstein/core/orchestration/schedule_supervisor.py: long-running supervisor with skip (default) and catch_up misfire policies, audit chain wiring via the existing AuditLog primitives, and counterfactual receipts for skipped windows. - src/bernstein/core/trigger_sources/schedule.py: TriggerEvent normaliser for the existing trigger pipeline. - src/bernstein/cli/commands/schedule_cmd.py: add / list / show / remove / audit / doctor / run verbs with --json output. - src/bernstein/cli/commands/{status_cmd,doctor_cmd}.py: doctor integration reports supervisor liveness, last fire, next fire. - docs/operations/schedule.md: operator-facing reference (registration, restart semantics, audit walk). Acceptance criteria coverage - schedule add persists under .sdd/runtime/schedules/<id>.json and validates the cron expression. - schedule list / remove / show with human and --json output. - bernstein schedule run is the long-running supervisor; ticks build a TriggerEvent and dispatch through TriggerManager.evaluate. - Projection is byte-identical across hosts; two operators on the same (schedule_id, fire_time, last_state) compute the same task graph and the same projection_hash. - Each fire appends schedule.fire to the existing HMAC chain with (schedule_id, fire_time, projection_hash, prev_chain_digest). - bernstein schedule doctor + bernstein doctor surface the schedule supervisor's liveness and the next/last fire. - Misfire policy default = skip; catch_up is per-schedule opt-in. Both documented under docs/operations/schedule.md. - A missed window emits a lineage receipt the operator can replay. Tests - tests/unit/test_schedule_store.py: store CRUD + cron validation (37 cases). - tests/unit/test_schedule_projection.py: byte-identical determinism + purity (12 cases). - tests/unit/test_schedule_supervisor.py: cron iteration, misfire policies, chain integration, doctor status (26 cases). - tests/integration/test_schedule_audit_chain.py: end-to-end with the real AuditLog, two-operator byte-identical proof, tamper detection (5 cases).
There was a problem hiding this comment.
Sorry @chernistry, you have reached your weekly rate limit of 2500000 diff characters.
Please try again later or upgrade to continue using Sourcery
Sonar insights (advisory, no merge-block)Snapshot of
Run This comment is a soft signal. The Sonar scan runs on push to |
Review-bot acknowledgement summary
All must-address findings are resolved or acknowledged. |
|
Caution Review failedThe pull request is closed. ℹ️ Recent review info⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: ASSERTIVE Plan: Pro Run ID: 📒 Files selected for processing (13)
📝 WalkthroughWalkthroughThis PR implements a complete operator-registered recurring schedule system enabling deterministic, auditable goal execution on fixed cadences within a single bernstein installation. Schedule fires dispatch deterministic task graphs whose projections are cryptographically verifiable; misfire policies ( ChangesSchedule subsystem
Sequence DiagramsequenceDiagram
actor Op as Operator
participant CLI as bernstein schedule run
participant Sup as ScheduleSupervisor
participant Store as ScheduleStore
participant Proj as project_schedule_fire
participant AuditWriter as AuditLog
participant TrigMgr as TriggerManager
Op->>CLI: schedule run --interval 60
CLI->>Sup: __init__(store, dispatch_fn, audit_writer)
loop Every 60 seconds
CLI->>Sup: tick(now=current_epoch)
Sup->>Store: list() schedules
loop Per schedule
Sup->>Sup: _next_fire_after(last_fire) → fire_time
alt fire_time <= now
Sup->>Proj: project_schedule_fire(schedule_id, fire_time, last_state, ...)
Proj-->>Sup: ProjectionResult(canonical_bytes, projection_hash, rev)
Sup->>AuditWriter: append(schedule.fire, {projection_hash, prev_chain_digest, ...})
AuditWriter-->>Sup: chain_digest
Sup->>TrigMgr: dispatch(TriggerEvent with projection_hash, fire_time)
TrigMgr-->>Sup: async dispatch result
Sup->>Store: update_last_fire(schedule_id, fire_time)
Sup->>Sup: persist FireReceipt to runtime/schedule_receipts
end
end
Sup-->>CLI: list[FireReceipt]
end
CLI-->>Op: receipt counts + errors
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes This PR introduces a substantial new subsystem with interconnected layers: deterministic cron math, pure projection functions, stateful supervisor orchestration with misfire policy branching, audit-chain integration, and comprehensive CLI and test coverage. The review requires careful attention to cron iteration correctness, determinism properties in projection and audit chaining, and misfire semantics across Suggested labels
✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
|
bernstein doctor observe for PR #1806 ( sonar -- WARN (project bernstein)
code-scanning -- WARN (5 open alert(s))
Skipped backends (credentials not configured)
See docs/observability/unified-doctor.md for backend setup notes. |
Contract drift detected - proposed patchInline autofix push failed ( Three contract tests act as drift detectors against the public CLI / API surface:
One or more failed on this PR. Files changed: How to applyEither run the regen script locally: uv run python scripts/regen_contract_drift.py --fixture all
git add -A && git commit -m "chore(ci): regenerate contract drift allow-lists"
git pushOr apply the patch directly: gh pr checkout 1806
git apply <<'PATCH'
diff --git a/tests/unit/test_readme_api_coverage.py b/tests/unit/test_readme_api_coverage.py
index f492845b..8151f2de 100644
--- a/tests/unit/test_readme_api_coverage.py
+++ b/tests/unit/test_readme_api_coverage.py
@@ -237,6 +237,8 @@ DOCUMENTED_COMMANDS: frozenset[str] = frozenset(
"interop",
# Bot-added: drift autofix (regen_contract_drift.py)
"desktop-register",
+ # Bot-added: drift autofix (regen_contract_drift.py)
+ "schedule",
}
)
PATCH
git add -A && git commit -m "chore(ci): regenerate contract drift allow-lists"
git pushFull diffdiff --git a/tests/unit/test_readme_api_coverage.py b/tests/unit/test_readme_api_coverage.py
index f492845b..8151f2de 100644
--- a/tests/unit/test_readme_api_coverage.py
+++ b/tests/unit/test_readme_api_coverage.py
@@ -237,6 +237,8 @@ DOCUMENTED_COMMANDS: frozenset[str] = frozenset(
"interop",
# Bot-added: drift autofix (regen_contract_drift.py)
"desktop-register",
+ # Bot-added: drift autofix (regen_contract_drift.py)
+ "schedule",
}
)Source CI run: https://github.com/sipyourdrink-ltd/bernstein/actions/runs/26249816693 Refs #1273. |
| """List all registered schedules.""" | ||
| sdd = _sdd_dir() | ||
| store = ScheduleStore(sdd) | ||
| schedules = store.list() |
| store = ScheduleStore(sdd) | ||
| supervisor = ScheduleSupervisor(store, lambda _evt: None, None) | ||
|
|
||
| status = supervisor.status() |
| self._store = store | ||
| self._dispatch = dispatch | ||
| self._chain = _AuditChainAdapter(audit_writer) if audit_writer is not None else None | ||
| self._catch_up_limit = max(1, int(catch_up_limit)) |
| misfire_policy: MisfirePolicy = "skip" | ||
| created_at: float = 0.0 | ||
| last_fire_at: float = 0.0 | ||
| extra: dict[str, Any] = field(default_factory=lambda: {}) |
| continue | ||
| start, end = value, hi | ||
|
|
||
| for v in range(start, end + 1, step): |
| scenario_id=schedule.scenario_id, | ||
| misfire_policy=schedule.misfire_policy, | ||
| created_at=schedule.created_at, | ||
| last_fire_at=float(fire_time), |
| metadata: dict[str, Any] = { | ||
| "source_type": "schedule", | ||
| "schedule_id": schedule_id, | ||
| "fire_time": float(fire_time), |
|
|
||
| return TriggerEvent( | ||
| source="schedule", | ||
| timestamp=float(fire_time), |
| timestamp=float(fire_time), | ||
| raw_payload={ | ||
| "schedule_id": schedule_id, | ||
| "fire_time": float(fire_time), |
Closes #1798.
Summary
Operator-registered recurring goals with a deterministic fire contract. A scheduled fire is a pure projection from
(schedule_id, fire_time, last_state)onto a canonical task graph: two operators that share the same triple land on the byte-identical task graph and the sameprojection_hash. Each fire appends aschedule.fireentry to the existingAuditLogHMAC chain (no parallel chain) and pushes aTriggerEventthrough the standard trigger pipeline.Files touched
src/bernstein/core/planning/schedule_store.py(new): schedule CRUD over.sdd/runtime/schedules/<id>.jsonplus a self-contained 5-field cron parser. No new runtime dependency.src/bernstein/core/orchestration/schedule_projection.py(new): pure deterministic projection; no wall-clock, no random, no host-dependent ordering.fire_timeis pinned tointso sub-second drift cannot fork two operators.src/bernstein/core/orchestration/schedule_supervisor.py(new): long-running supervisor with skip (default) and catch_up misfire policies, audit chain wiring via the existing AuditLog primitives, and counterfactual receipts for skipped windows.src/bernstein/core/trigger_sources/schedule.py(new): TriggerEvent normaliser for the existing trigger pipeline.src/bernstein/cli/commands/schedule_cmd.py(new):add/list/show/remove/audit/doctor/runverbs with--jsonoutput.src/bernstein/cli/commands/status_cmd.py,src/bernstein/cli/commands/doctor_cmd.py: doctor integration reports supervisor liveness, last fire, next fire.src/bernstein/cli/main.py: registerschedulesubcommand.docs/operations/schedule.md(new): operator-facing reference.tests/unit/test_schedule_store.py(new)tests/unit/test_schedule_projection.py(new)tests/unit/test_schedule_supervisor.py(new)tests/integration/test_schedule_audit_chain.py(new)Acceptance criteria coverage
bernstein schedule add --cron <expr> --goal <text> [--scenario <id>]persists under.sdd/runtime/schedules/<id>.jsonand validates the cron expression.schedule list,schedule remove <id>,schedule show <id>with human and--jsonoutput.bernstein schedule run) wakes at the configured cadence, builds a TriggerEvent, and dispatches through the existing trigger pipeline viaTriggerManager.evaluate.(schedule_id, fire_time, last_state)onto a canonical task graph; two operators with identical state arrive at byte-identical task graphs at fire_time T.event_type=schedule.firewith payload(schedule_id, fire_time, projection_hash, prev_chain_digest)in the existing AuditLog chain.bernstein schedule auditwalks the persisted receipts and verifies the recordedprojection_hashmatches a re-projection from the inputs.bernstein doctorandbernstein schedule doctor).skipdefault,catch_upopt-in) is documented indocs/operations/schedule.md, configurable per schedule, and missed windows leave counterfactual lineage receipts.docs/operations/schedule.md.Test plan
uv run pytest tests/unit/ -q --no-cov --timeout=120 -k "schedule or trigger_sources"(151 passed)uv run pytest tests/integration/test_schedule_audit_chain.py(5 passed)uv run ruff checkclean on new filesuv run ruff formatapplieduv run pyrightclean on new modulesuv run lint-importscleanschedule add->schedule list->schedule show->schedule doctor->schedule remove;bernstein doctortable includes the supervisor row.Summary by CodeRabbit
New Features
bernstein schedulecommand group supporting recurring cron-based scheduling with subcommands for add, list, show, remove, audit, run, and doctor health checks.skipandcatch_up) for handling missed schedule windows.Documentation
Tests