Skip to content

Commit bf71028

Browse files
feat(069-A2): actor-owned usage aggregate + persistence (#560)
* feat(069-A2): actor-owned usage aggregate + persistence Spec 069 Stream A2 (T005–T010): in-memory usage rollup owned by the ActivityService goroutine, published to readers as a copy-on-write snapshot via atomic pointer, persisted to BBolt with cold-start load-or-rebuild. - UsageAggregate/ToolUsage/TimeBucket + incremental Apply (internal/runtime/usage_aggregate.go) - UsageStore: atomic-pointer COW snapshot; lock-free, non-blocking reads - ActivityService owns it: Apply on save in handleEvent, UsageSnapshot() reader, periodic 30s flush + flush-on-shutdown, cold-start load-or-rebuild (one full scan) - activity_stats BBolt bucket (versioned key) + byte-oriented persist/load + ScanAllActivities - observability.usage_cache_ttl (5s) + usage_persist_interval (30s) config: defaults + hot-reload - docs (configuration.md) + spec trace (tasks T005–T010, data-model A2 notes) TDD: aggregate math, snapshot/reads-never-block, persistence round-trip, cold-start rebuild vs load, and config defaults all covered. Full internal/runtime -race suite green (approval-hash canary safe). Related #745 Co-Authored-By: Paperclip <noreply@paperclip.ing> * perf(069-A2): make UsageStore.Apply O(1) on the activity hot path Apply previously cloned the entire usage aggregate (every tool + every time bucket) on every activity write via publishLocked, making the write hot path O(tools×buckets) instead of O(1) — violating spec 069 CN-002 ("aggregate update O(1) per activity write; must not block the request hot path"). Decouple publish from write: Apply now mutates the working aggregate under a short writer lock and only marks the snapshot stale (atomic dirty flag, O(1), no clone). The clone is deferred to Snapshot (publish-on-read): the first reader after a write burst materializes one fresh snapshot off the hot path; reads with no pending writes stay lock-free. The A3 endpoint and the 30s persist flush are the only readers, so clones are rare relative to writes. Test-first (ENG-1): TestUsageStore_ApplyDoesNotPublishPerWrite asserts 500 writes trigger zero publishes and exactly one clone on first read; BenchmarkUsageStore_Apply shows 1 alloc/op with 1000 primed tools (was O(tools) allocs/op). Existing snapshot/replace contract tests and the full -race runtime suite stay green. Related #560 Related MCP-835 * fix(069-A2): count blocked attempts + wire observability hot-reload Addresses CodexReviewer findings #2 and #3 on PR #560 (the O(1) fix cleared #1 once CI went green). #2 — blocked tool attempts were missing from the usage aggregate. They are persisted as blocked `policy_decision` records, but `Apply` dropped all non-tool_call records and `handlePolicyDecision` never fed the aggregate, so the contract's per-tool `blocked` field was permanently 0. `Apply` now also folds blocked policy_decisions: a blocked attempt never executed, so it increments only `Blocked` + `LastUsed` — not `Calls`, latency, bytes, or the executed-call timeline. `handlePolicyDecision` calls `usage.Apply` on save success so the live path matches a cold-start rebuild-from-scan. Extracted a `tool()` get-or-create helper. #3 — `observability.usage_persist_interval` claimed hot-reload but was only read at construction. `DetectConfigChanges` now flags an `observability` change and `ApplyConfig` pushes the new cadence into the running ActivityService via `SetUsagePersistInterval` (the flush loop already re-reads the interval each cycle). Test-first (ENG-1): aggregate counts blocked-only (not Calls/latency/ timeline); live `handlePolicyDecision` folds blocked into the snapshot; `DetectConfigChanges` detects observability as hot-reloadable; end-to-end `ApplyConfig` applies the new interval to a running runtime. Repointed the "ignores non-tool_calls" test to a non-blocked decision. Contract documents `blocked` semantics. Full internal/runtime+config+storage -race green; lint 0; personal+server builds. Related #560 Related MCP-835 Co-Authored-By: Paperclip <noreply@paperclip.ing> --------- Co-authored-by: Paperclip <noreply@paperclip.ing>
1 parent ccbeb50 commit bf71028

21 files changed

Lines changed: 1406 additions & 14 deletions

docs/configuration.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -829,6 +829,32 @@ See [Search Servers Documentation](search_servers.md) for complete details.
829829

830830
---
831831

832+
## Observability
833+
834+
Controls the usage-statistics aggregate that powers the Web UI usage graphs
835+
(spec 069). The aggregate is built incrementally from the activity log, kept in
836+
memory as an immutable snapshot, and periodically persisted so it survives
837+
restarts without a full re-scan.
838+
839+
```json
840+
{
841+
"observability": {
842+
"usage_cache_ttl": "5s",
843+
"usage_persist_interval": "30s"
844+
}
845+
}
846+
```
847+
848+
| Field | Type | Default | Description |
849+
|-------|------|---------|-------------|
850+
| `usage_cache_ttl` | duration string | `5s` | Freshness bound for the usage endpoint's read cache on wide time windows. |
851+
| `usage_persist_interval` | duration string | `30s` | How often the in-memory usage aggregate snapshot is flushed to storage (also flushed on graceful shutdown). |
852+
853+
Both fields are optional, accept Go duration strings (e.g. `"10s"`, `"1m"`),
854+
and are hot-reloadable. Non-positive values fall back to the defaults.
855+
856+
---
857+
832858
## Complete Example
833859

834860
Here's a complete configuration example with all major sections:

internal/config/config.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,9 @@ type Config struct {
149149
// Telemetry settings (Spec 036)
150150
Telemetry *TelemetryConfig `json:"telemetry,omitempty" mapstructure:"telemetry"`
151151

152+
// Observability settings (Spec 069): usage aggregate cache/persistence cadence.
153+
Observability *ObservabilityConfig `json:"observability,omitempty" mapstructure:"observability"`
154+
152155
// Routing mode (Spec 031): how MCP tools are exposed to clients
153156
// Valid values: "retrieve_tools" (default), "direct", "code_execution"
154157
RoutingMode string `json:"routing_mode,omitempty" mapstructure:"routing-mode"`
@@ -732,6 +735,24 @@ func (c *IntentDeclarationConfig) IsStrictServerValidation() bool {
732735
return c.StrictServerValidation
733736
}
734737

738+
// ObservabilityConfig controls the Spec 069 usage aggregate cadence.
739+
type ObservabilityConfig struct {
740+
// UsageCacheTTL bounds the freshness of the usage endpoint's read cache for
741+
// wide windows (FR-005). Default 5s.
742+
UsageCacheTTL Duration `json:"usage_cache_ttl,omitempty" mapstructure:"usage-cache-ttl" swaggertype:"string"`
743+
// UsagePersistInterval is how often the actor-owned usage aggregate snapshot
744+
// is flushed to storage. Default 30s.
745+
UsagePersistInterval Duration `json:"usage_persist_interval,omitempty" mapstructure:"usage-persist-interval" swaggertype:"string"`
746+
}
747+
748+
// DefaultObservabilityConfig returns the default observability configuration.
749+
func DefaultObservabilityConfig() *ObservabilityConfig {
750+
return &ObservabilityConfig{
751+
UsageCacheTTL: Duration(5 * time.Second),
752+
UsagePersistInterval: Duration(30 * time.Second),
753+
}
754+
}
755+
735756
// ToolRegistration represents a tool registration
736757
type ToolRegistration struct {
737758
Name string `json:"name"`
@@ -965,6 +986,9 @@ func DefaultConfig() *Config {
965986

966987
// Intent declaration defaults (Spec 018) - strict validation by default for security
967988
IntentDeclaration: DefaultIntentDeclarationConfig(),
989+
990+
// Observability defaults (Spec 069)
991+
Observability: DefaultObservabilityConfig(),
968992
}
969993
}
970994

@@ -1351,6 +1375,19 @@ func (c *Config) Validate() error {
13511375
c.IntentDeclaration = DefaultIntentDeclarationConfig()
13521376
}
13531377

1378+
// Ensure Observability config is not nil and has sane cadence defaults
1379+
// (Spec 069). The hot-reload path re-runs Validate, so zeroed fields are
1380+
// repaired rather than disabling persistence/caching entirely.
1381+
if c.Observability == nil {
1382+
c.Observability = DefaultObservabilityConfig()
1383+
}
1384+
if c.Observability.UsageCacheTTL.Duration() <= 0 {
1385+
c.Observability.UsageCacheTTL = Duration(5 * time.Second)
1386+
}
1387+
if c.Observability.UsagePersistInterval.Duration() <= 0 {
1388+
c.Observability.UsagePersistInterval = Duration(30 * time.Second)
1389+
}
1390+
13541391
return nil
13551392
}
13561393

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package config
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/stretchr/testify/assert"
8+
"github.com/stretchr/testify/require"
9+
)
10+
11+
func TestDefaultObservabilityConfig(t *testing.T) {
12+
o := DefaultObservabilityConfig()
13+
require.NotNil(t, o)
14+
assert.Equal(t, 5*time.Second, o.UsageCacheTTL.Duration())
15+
assert.Equal(t, 30*time.Second, o.UsagePersistInterval.Duration())
16+
}
17+
18+
func TestDefaultConfig_HasObservabilityDefaults(t *testing.T) {
19+
cfg := DefaultConfig()
20+
require.NotNil(t, cfg.Observability)
21+
assert.Equal(t, 5*time.Second, cfg.Observability.UsageCacheTTL.Duration())
22+
assert.Equal(t, 30*time.Second, cfg.Observability.UsagePersistInterval.Duration())
23+
}
24+
25+
func TestValidate_FillsObservabilityDefaults(t *testing.T) {
26+
// A config loaded without an observability block gets defaults applied
27+
// on Validate (hot-reload path re-runs Validate).
28+
cfg := DefaultConfig()
29+
cfg.Observability = nil
30+
require.NoError(t, cfg.Validate())
31+
require.NotNil(t, cfg.Observability)
32+
assert.Equal(t, 5*time.Second, cfg.Observability.UsageCacheTTL.Duration())
33+
assert.Equal(t, 30*time.Second, cfg.Observability.UsagePersistInterval.Duration())
34+
35+
// Zero/negative interval fields are repaired to defaults.
36+
cfg.Observability = &ObservabilityConfig{}
37+
require.NoError(t, cfg.Validate())
38+
assert.Equal(t, 5*time.Second, cfg.Observability.UsageCacheTTL.Duration())
39+
assert.Equal(t, 30*time.Second, cfg.Observability.UsagePersistInterval.Duration())
40+
}
41+
42+
func TestObservabilityConfig_PreservesUserValues(t *testing.T) {
43+
cfg := DefaultConfig()
44+
cfg.Observability = &ObservabilityConfig{
45+
UsageCacheTTL: Duration(2 * time.Second),
46+
UsagePersistInterval: Duration(60 * time.Second),
47+
}
48+
require.NoError(t, cfg.Validate())
49+
assert.Equal(t, 2*time.Second, cfg.Observability.UsageCacheTTL.Duration())
50+
assert.Equal(t, 60*time.Second, cfg.Observability.UsagePersistInterval.Duration())
51+
}

internal/runtime/activity_service.go

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package runtime
33
import (
44
"context"
55
"encoding/json"
6+
"sync/atomic"
67
"time"
78

89
"go.uber.org/zap"
@@ -49,11 +50,18 @@ type ActivityService struct {
4950

5051
// Event emitter for sensitive data detection events (Spec 026)
5152
eventEmitter SensitiveDataEventEmitter
53+
54+
// Usage aggregate (Spec 069 A2): actor-owned rollup of tool-call activity.
55+
// Mutated only on this goroutine via Apply; published to readers as an
56+
// immutable snapshot. usagePersistIntervalNs is the hot-reloadable flush
57+
// cadence in nanoseconds.
58+
usage *UsageStore
59+
usagePersistIntervalNs atomic.Int64
5260
}
5361

5462
// NewActivityService creates a new activity service.
5563
func NewActivityService(storage *storage.Manager, logger *zap.Logger) *ActivityService {
56-
return &ActivityService{
64+
s := &ActivityService{
5765
storage: storage,
5866
logger: logger,
5967
eventCh: make(chan Event, 100), // Buffer for non-blocking event delivery
@@ -62,7 +70,10 @@ func NewActivityService(storage *storage.Manager, logger *zap.Logger) *ActivityS
6270
maxRecords: DefaultRetentionMaxRecords,
6371
checkInterval: DefaultRetentionCheckInterval,
6472
detector: nil, // Detector is optional, set via SetDetector
73+
usage: newUsageStore(),
6574
}
75+
s.usagePersistIntervalNs.Store(int64(DefaultUsagePersistInterval))
76+
return s
6677
}
6778

6879
// SetDetector sets the sensitive data detector for async scanning (Spec 026).
@@ -103,17 +114,25 @@ func (s *ActivityService) Start(ctx context.Context, rt *Runtime) {
103114
// Start retention loop in a separate goroutine
104115
go s.runRetentionLoop(ctx)
105116

117+
// Spec 069 A2: load/rebuild the usage aggregate before processing events,
118+
// then start the periodic snapshot flush loop.
119+
s.initUsageFromStorage()
120+
go s.runUsageFlushLoop(ctx)
121+
106122
s.logger.Info("Activity service started")
107123

108124
for {
109125
select {
110126
case <-ctx.Done():
111127
s.logger.Info("Activity service shutting down")
128+
// Flush-on-shutdown: persist the final usage snapshot (Spec 069 A2).
129+
s.persistUsage()
112130
close(s.done)
113131
return
114132
case evt, ok := <-eventCh:
115133
if !ok {
116134
s.logger.Info("Activity service event channel closed")
135+
s.persistUsage()
117136
close(s.done)
118137
return
119138
}
@@ -303,6 +322,13 @@ func (s *ActivityService) handleToolCallCompleted(evt Event) {
303322
zap.String("tool_name", toolName),
304323
zap.String("status", status))
305324

325+
// Fold the persisted call into the usage aggregate (Spec 069 A2). Done
326+
// only on save success so the in-memory rollup stays consistent with a
327+
// cold-start rebuild that re-scans persisted records.
328+
if s.usage != nil {
329+
s.usage.Apply(record)
330+
}
331+
306332
// Run async sensitive data detection (Spec 026)
307333
if s.detector != nil {
308334
go s.runAsyncDetection(record.ID, arguments, response)
@@ -336,6 +362,15 @@ func (s *ActivityService) handlePolicyDecision(evt Event) {
336362
zap.Error(err),
337363
zap.String("server_name", serverName),
338364
zap.String("decision", decision))
365+
return
366+
}
367+
368+
// Fold blocked attempts into the usage aggregate (Spec 069 A2). Apply
369+
// ignores non-blocked decisions, so passing every policy decision is safe.
370+
// Done only on save success so the in-memory rollup stays consistent with a
371+
// cold-start rebuild that re-scans persisted records.
372+
if s.usage != nil {
373+
s.usage.Apply(record)
339374
}
340375
}
341376

internal/runtime/apply_config_restart_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"path/filepath"
66
"runtime"
77
"testing"
8+
"time"
89

910
"go.uber.org/zap"
1011

@@ -107,6 +108,41 @@ func TestApplyConfig_HotReloadableChange(t *testing.T) {
107108
assert.Equal(t, 20, savedCfg.ToolsLimit, "Config file should be updated with new ToolsLimit value")
108109
}
109110

111+
// TestApplyConfig_ObservabilityHotReload (MCP-835 / Codex finding #3): changing
112+
// the observability usage persist interval must hot-reload into the running
113+
// ActivityService — previously ApplyConfig only handled logging/truncator, so
114+
// SetUsagePersistInterval's "hot-reloadable" promise was unfulfilled.
115+
func TestApplyConfig_ObservabilityHotReload(t *testing.T) {
116+
tmpDir := t.TempDir()
117+
cfgPath := filepath.Join(tmpDir, "config.json")
118+
119+
initialCfg := config.DefaultConfig()
120+
initialCfg.Listen = "127.0.0.1:8080"
121+
initialCfg.DataDir = tmpDir
122+
require.NoError(t, config.SaveConfig(initialCfg, cfgPath))
123+
124+
rt, err := New(initialCfg, cfgPath, zap.NewNop())
125+
require.NoError(t, err)
126+
defer func() { _ = rt.Close() }()
127+
128+
// Default cadence is 30s before the reload.
129+
require.Equal(t, DefaultUsagePersistInterval, rt.ActivityService().usagePersistInterval())
130+
131+
newCfg := config.DefaultConfig()
132+
newCfg.Listen = "127.0.0.1:8080"
133+
newCfg.DataDir = tmpDir
134+
newCfg.Observability.UsagePersistInterval = config.Duration(10 * time.Second)
135+
136+
result, err := rt.ApplyConfig(newCfg, cfgPath)
137+
require.NoError(t, err)
138+
require.NotNil(t, result)
139+
140+
assert.False(t, result.RequiresRestart, "observability cadence change is hot-reloadable")
141+
assert.Contains(t, result.ChangedFields, "observability")
142+
assert.Equal(t, 10*time.Second, rt.ActivityService().usagePersistInterval(),
143+
"new persist interval must be applied to the running ActivityService")
144+
}
145+
110146
// TestApplyConfig_SaveFailure tests handling of save errors
111147
func TestApplyConfig_SaveFailure(t *testing.T) {
112148
// Skip on Windows: chmod on directories doesn't reliably prevent file creation

internal/runtime/config_hotreload.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,12 @@ func DetectConfigChanges(oldCfg, newCfg *config.Config) *ConfigApplyResult {
133133
result.ChangedFields = append(result.ChangedFields, "environment")
134134
}
135135

136+
// Observability cadence (Spec 069 A2 — can be hot-reloaded; the usage flush
137+
// loop re-reads the interval each cycle, so applying it is just a setter).
138+
if !reflect.DeepEqual(oldCfg.Observability, newCfg.Observability) {
139+
result.ChangedFields = append(result.ChangedFields, "observability")
140+
}
141+
136142
// If no changes detected
137143
if len(result.ChangedFields) == 0 {
138144
result.AppliedImmediately = false

internal/runtime/config_hotreload_test.go

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,32 @@ import (
99
"github.com/stretchr/testify/require"
1010
)
1111

12+
// TestDetectConfigChanges_Observability (MCP-835 / Codex finding #3): changing
13+
// the observability usage cadence must be detected as a hot-reloadable change so
14+
// ApplyConfig can push the new persist interval to the running ActivityService.
15+
// SetUsagePersistInterval advertises hot-reload; the detector must back it.
16+
func TestDetectConfigChanges_Observability(t *testing.T) {
17+
base := &config.Config{
18+
Listen: "127.0.0.1:8080", DataDir: "/d", TLS: &config.TLSConfig{},
19+
Observability: &config.ObservabilityConfig{
20+
UsageCacheTTL: config.Duration(5 * time.Second),
21+
UsagePersistInterval: config.Duration(30 * time.Second),
22+
},
23+
}
24+
changed := &config.Config{
25+
Listen: "127.0.0.1:8080", DataDir: "/d", TLS: &config.TLSConfig{},
26+
Observability: &config.ObservabilityConfig{
27+
UsageCacheTTL: config.Duration(5 * time.Second),
28+
UsagePersistInterval: config.Duration(10 * time.Second),
29+
},
30+
}
31+
32+
result := DetectConfigChanges(base, changed)
33+
require.True(t, result.Success)
34+
assert.Contains(t, result.ChangedFields, "observability")
35+
assert.False(t, result.RequiresRestart, "cadence change is hot-reloadable")
36+
}
37+
1238
func TestDetectConfigChanges(t *testing.T) {
1339
baseConfig := &config.Config{
1440
Listen: "127.0.0.1:8080",
@@ -49,7 +75,7 @@ func TestDetectConfigChanges(t *testing.T) {
4975
Listen: ":9090", // Changed
5076
DataDir: "/test/data",
5177
APIKey: "test-key",
52-
ToolsLimit: 15,
78+
ToolsLimit: 15,
5379
ToolResponseLimit: 1000,
5480
CallToolTimeout: config.Duration(60 * time.Second),
5581
Servers: []*config.ServerConfig{},
@@ -67,7 +93,7 @@ func TestDetectConfigChanges(t *testing.T) {
6793
Listen: "127.0.0.1:8080",
6894
DataDir: "/different/data", // Changed
6995
APIKey: "test-key",
70-
ToolsLimit: 15,
96+
ToolsLimit: 15,
7197
ToolResponseLimit: 1000,
7298
CallToolTimeout: config.Duration(60 * time.Second),
7399
Servers: []*config.ServerConfig{},
@@ -85,7 +111,7 @@ func TestDetectConfigChanges(t *testing.T) {
85111
Listen: "127.0.0.1:8080",
86112
DataDir: "/test/data",
87113
APIKey: "new-key", // Changed
88-
ToolsLimit: 15,
114+
ToolsLimit: 15,
89115
ToolResponseLimit: 1000,
90116
CallToolTimeout: config.Duration(60 * time.Second),
91117
Servers: []*config.ServerConfig{},
@@ -103,7 +129,7 @@ func TestDetectConfigChanges(t *testing.T) {
103129
Listen: "127.0.0.1:8080",
104130
DataDir: "/test/data",
105131
APIKey: "test-key",
106-
ToolsLimit: 15,
132+
ToolsLimit: 15,
107133
ToolResponseLimit: 1000,
108134
CallToolTimeout: config.Duration(60 * time.Second),
109135
Servers: []*config.ServerConfig{},
@@ -124,7 +150,7 @@ func TestDetectConfigChanges(t *testing.T) {
124150
Listen: "127.0.0.1:8080",
125151
DataDir: "/test/data",
126152
APIKey: "test-key",
127-
ToolsLimit: 20, // Changed
153+
ToolsLimit: 20, // Changed
128154
ToolResponseLimit: 1000,
129155
CallToolTimeout: config.Duration(60 * time.Second),
130156
Servers: []*config.ServerConfig{},
@@ -144,7 +170,7 @@ func TestDetectConfigChanges(t *testing.T) {
144170
Listen: "127.0.0.1:8080",
145171
DataDir: "/test/data",
146172
APIKey: "test-key",
147-
ToolsLimit: 15,
173+
ToolsLimit: 15,
148174
ToolResponseLimit: 1000,
149175
CallToolTimeout: config.Duration(60 * time.Second),
150176
Servers: []*config.ServerConfig{ // Changed

0 commit comments

Comments
 (0)