From c264776d08c7cfa95e917f912e1477361025cf9e Mon Sep 17 00:00:00 2001 From: Siddhant Khare Date: Sat, 2 May 2026 13:11:06 +0000 Subject: [PATCH 1/3] feat(cache): cache-aware dedup with preserve_cache_prefix option (#50) Add PrefixPartition to pkg/cache that splits a chunk slice at the last cache_control marker. Wire into /v1/dedupe via options.preserve_cache_prefix: the frozen prefix is passed through unchanged; the dedup pipeline runs only on the suffix. Response stats include cache_prefix_frozen, cache_prefix_tokens, cache_prefix_hash, suffix_input_count, suffix_output_count. DedupeChunk gains cache_control field. DedupeOptions type added. Co-authored-by: Ona --- cmd/api.go | 98 +++++++++++++++++++++-------- pkg/cache/prefix.go | 128 ++++++++++++++++++++++++++++++++++++++ pkg/cache/prefix_test.go | 130 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 329 insertions(+), 27 deletions(-) create mode 100644 pkg/cache/prefix.go create mode 100644 pkg/cache/prefix_test.go diff --git a/cmd/api.go b/cmd/api.go index 6e221e5..1e9f7d8 100644 --- a/cmd/api.go +++ b/cmd/api.go @@ -11,6 +11,7 @@ import ( "syscall" "time" + distillcache "github.com/Siddhant-K-code/distill/pkg/cache" "github.com/Siddhant-K-code/distill/pkg/contextlab" "github.com/Siddhant-K-code/distill/pkg/embedding/openai" "github.com/Siddhant-K-code/distill/pkg/metrics" @@ -61,14 +62,27 @@ type DedupeRequest struct { Threshold float64 `json:"threshold,omitempty"` Lambda float64 `json:"lambda,omitempty"` TargetK int `json:"target_k,omitempty"` + Options DedupeOptions `json:"options,omitempty"` +} + +// DedupeOptions controls optional dedup behaviour. +type DedupeOptions struct { + // PreserveCachePrefix freezes chunks before the last cache_control marker + // so the dedup pipeline cannot reorder or remove them. This prevents + // Distill from silently invalidating Anthropic prompt cache prefixes. + PreserveCachePrefix bool `json:"preserve_cache_prefix,omitempty"` } // DedupeChunk represents a chunk in the request. type DedupeChunk struct { - ID string `json:"id"` - Text string `json:"text"` - Embedding []float32 `json:"embedding,omitempty"` - Score float32 `json:"score,omitempty"` + ID string `json:"id"` + Text string `json:"text"` + Embedding []float32 `json:"embedding,omitempty"` + Score float32 `json:"score,omitempty"` + // CacheControl mirrors the Anthropic cache_control field. When non-empty, + // this chunk is treated as a cache boundary marker. Used with + // options.preserve_cache_prefix to freeze the prefix during dedup. + CacheControl string `json:"cache_control,omitempty"` } // DedupeResponse is the JSON response for /v1/dedupe. @@ -92,6 +106,13 @@ type DedupeStats struct { ClusterCount int `json:"cluster_count"` ReductionPct int `json:"reduction_pct"` LatencyMs int64 `json:"latency_ms"` + + // Cache prefix fields — populated when options.preserve_cache_prefix=true. + CachePrefixFrozen bool `json:"cache_prefix_frozen,omitempty"` + CachePrefixTokens int `json:"cache_prefix_tokens,omitempty"` + CachePrefixHash string `json:"cache_prefix_hash,omitempty"` + SuffixInputCount int `json:"suffix_input_count,omitempty"` + SuffixOutputCount int `json:"suffix_output_count,omitempty"` } // APIServer holds the API server state. @@ -346,32 +367,43 @@ func (s *APIServer) handleDedupe(w http.ResponseWriter, r *http.Request) { start := time.Now() - // Convert to internal types + // Convert to internal types, preserving cache_control metadata. chunks := make([]types.Chunk, len(req.Chunks)) needsEmbedding := false - for i, c := range req.Chunks { chunks[i] = types.Chunk{ ID: c.ID, Text: c.Text, Embedding: c.Embedding, Score: c.Score, + Metadata: make(map[string]interface{}), + } + if c.CacheControl != "" { + chunks[i].Metadata["cache_control"] = c.CacheControl } if len(c.Embedding) == 0 { needsEmbedding = true } } - // Generate embeddings if needed + // Partition into frozen prefix + dedup-eligible suffix when requested. + var partition distillcache.PrefixPartition + dedupChunks := chunks + if req.Options.PreserveCachePrefix { + partition = distillcache.PartitionForCacheAwareDedup(chunks) + dedupChunks = partition.Suffix + } + + // Generate embeddings if needed (only for the dedup-eligible suffix). if needsEmbedding { if s.embedder == nil { http.Error(w, "Embeddings required but no embedding provider configured. Either provide embeddings in request or configure OPENAI_API_KEY.", http.StatusBadRequest) return } - _, embSpan := s.tracing.StartEmbedding(ctx, len(chunks)) - texts := make([]string, len(chunks)) - for i, c := range chunks { + _, embSpan := s.tracing.StartEmbedding(ctx, len(dedupChunks)) + texts := make([]string, len(dedupChunks)) + for i, c := range dedupChunks { texts[i] = c.Text } @@ -384,8 +416,8 @@ func (s *APIServer) handleDedupe(w http.ResponseWriter, r *http.Request) { } embSpan.End() - for i := range chunks { - chunks[i].Embedding = embeddings[i] + for i := range dedupChunks { + dedupChunks[i].Embedding = embeddings[i] } } @@ -403,13 +435,13 @@ func (s *APIServer) handleDedupe(w http.ResponseWriter, r *http.Request) { targetK = 0 // Will be set to cluster count } - // Cluster - _, clusterSpan := s.tracing.StartClustering(ctx, len(chunks), threshold) + // Cluster the dedup-eligible suffix only. + _, clusterSpan := s.tracing.StartClustering(ctx, len(dedupChunks), threshold) clusterer := contextlab.NewClusterer(contextlab.ClusterConfig{ Threshold: threshold, Linkage: "average", }) - clusterResult := clusterer.Cluster(chunks) + clusterResult := clusterer.Cluster(dedupChunks) clusterSpan.End() // Select representatives @@ -432,14 +464,17 @@ func (s *APIServer) handleDedupe(w http.ResponseWriter, r *http.Request) { mmrSpan.End() } + // Prepend the frozen prefix to the deduped suffix. + finalChunks := append(partition.Prefix, representatives...) + latency := time.Since(start) // Record result on root span - telemetry.RecordResult(rootSpan, len(req.Chunks), len(representatives), clusterResult.ClusterCount, latency) + telemetry.RecordResult(rootSpan, len(req.Chunks), len(finalChunks), clusterResult.ClusterCount, latency) // Build response - outputChunks := make([]DedupeChunkResponse, len(representatives)) - for i, c := range representatives { + outputChunks := make([]DedupeChunkResponse, len(finalChunks)) + for i, c := range finalChunks { outputChunks[i] = DedupeChunkResponse{ ID: c.ID, Text: c.Text, @@ -450,22 +485,31 @@ func (s *APIServer) handleDedupe(w http.ResponseWriter, r *http.Request) { reductionPct := 0 if len(req.Chunks) > 0 { - reductionPct = int((1 - float64(len(representatives))/float64(len(req.Chunks))) * 100) + reductionPct = int((1 - float64(len(finalChunks))/float64(len(req.Chunks))) * 100) + } + + stats := DedupeStats{ + InputCount: len(req.Chunks), + OutputCount: len(finalChunks), + ClusterCount: clusterResult.ClusterCount, + ReductionPct: reductionPct, + LatencyMs: latency.Milliseconds(), + } + if req.Options.PreserveCachePrefix && partition.MarkerCount > 0 { + stats.CachePrefixFrozen = true + stats.CachePrefixTokens = partition.FrozenPrefixTokens + stats.CachePrefixHash = partition.PrefixHash + stats.SuffixInputCount = len(partition.Suffix) + stats.SuffixOutputCount = len(representatives) } resp := DedupeResponse{ Chunks: outputChunks, - Stats: DedupeStats{ - InputCount: len(req.Chunks), - OutputCount: len(representatives), - ClusterCount: clusterResult.ClusterCount, - ReductionPct: reductionPct, - LatencyMs: latency.Milliseconds(), - }, + Stats: stats, } // Record dedup-specific metrics - s.metrics.RecordDedup("/v1/dedupe", len(req.Chunks), len(representatives), clusterResult.ClusterCount) + s.metrics.RecordDedup("/v1/dedupe", len(req.Chunks), len(finalChunks), clusterResult.ClusterCount) w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(resp) diff --git a/pkg/cache/prefix.go b/pkg/cache/prefix.go new file mode 100644 index 0000000..244b0c1 --- /dev/null +++ b/pkg/cache/prefix.go @@ -0,0 +1,128 @@ +package cache + +import ( + "crypto/sha256" + "encoding/hex" + + "github.com/Siddhant-K-code/distill/pkg/types" +) + +// PrefixPartition splits a chunk slice into a frozen cache prefix and a +// dedup-eligible suffix. The split point is the position immediately after +// the last chunk that carries a cache_control marker in its Metadata. +// +// If no cache_control markers are present, the entire slice is returned as +// the suffix (prefix is empty) and FrozenPrefixTokens is 0. +type PrefixPartition struct { + // Prefix contains chunks that must not be reordered or removed. + Prefix []types.Chunk + + // Suffix contains chunks eligible for the full dedup pipeline. + Suffix []types.Chunk + + // PrefixHash is the SHA-256 of the concatenated text of all prefix chunks. + // Stable across requests when the prefix is unchanged. + PrefixHash string + + // FrozenPrefixTokens is the estimated token count of the frozen prefix. + FrozenPrefixTokens int + + // MarkerCount is the number of cache_control markers found in the prefix. + MarkerCount int +} + +// PartitionForCacheAwareDedup partitions chunks for cache-aware deduplication. +// It detects cache_control markers in chunk metadata and freezes everything +// up to and including the last marked chunk. +// +// The caller should run the dedup pipeline only on Partition.Suffix, then +// prepend Partition.Prefix to the result before returning to the client. +func PartitionForCacheAwareDedup(chunks []types.Chunk) PrefixPartition { + lastMarkerIdx := -1 + markerCount := 0 + + for i, c := range chunks { + if hasCacheControl(c) { + lastMarkerIdx = i + markerCount++ + } + } + + if lastMarkerIdx < 0 { + // No markers — entire slice is the suffix. + return PrefixPartition{ + Prefix: nil, + Suffix: chunks, + } + } + + prefix := chunks[:lastMarkerIdx+1] + suffix := chunks[lastMarkerIdx+1:] + + return PrefixPartition{ + Prefix: prefix, + Suffix: suffix, + PrefixHash: hashPrefix(prefix), + FrozenPrefixTokens: estimatePrefixTokens(prefix), + MarkerCount: markerCount, + } +} + +// hasCacheControl returns true when a chunk carries a cache_control marker. +func hasCacheControl(c types.Chunk) bool { + if c.Metadata == nil { + return false + } + v, ok := c.Metadata["cache_control"] + if !ok { + return false + } + switch val := v.(type) { + case string: + return val != "" + case map[string]interface{}: + return len(val) > 0 + case bool: + return val + default: + return v != nil + } +} + +// hashPrefix returns a stable SHA-256 hex digest of the prefix text. +func hashPrefix(chunks []types.Chunk) string { + h := sha256.New() + for _, c := range chunks { + h.Write([]byte(c.Text)) + h.Write([]byte{0}) // null separator + } + return hex.EncodeToString(h.Sum(nil))[:16] +} + +// estimatePrefixTokens approximates the token count of the prefix using the +// 4-chars-per-token heuristic. +func estimatePrefixTokens(chunks []types.Chunk) int { + total := 0 + for _, c := range chunks { + total += (len(c.Text) + 3) / 4 + } + return total +} + +// PrefixAwareStats extends DedupeStats with cache prefix information. +type PrefixAwareStats struct { + // CachePrefixFrozen is true when a cache prefix was detected and frozen. + CachePrefixFrozen bool `json:"cache_prefix_frozen,omitempty"` + + // CachePrefixTokens is the estimated token count of the frozen prefix. + CachePrefixTokens int `json:"cache_prefix_tokens,omitempty"` + + // CachePrefixHash is the stable hash of the frozen prefix. + CachePrefixHash string `json:"cache_prefix_hash,omitempty"` + + // SuffixInputCount is the number of chunks in the dedup-eligible suffix. + SuffixInputCount int `json:"suffix_input_count,omitempty"` + + // SuffixOutputCount is the number of chunks after deduplication. + SuffixOutputCount int `json:"suffix_output_count,omitempty"` +} diff --git a/pkg/cache/prefix_test.go b/pkg/cache/prefix_test.go new file mode 100644 index 0000000..3916d0b --- /dev/null +++ b/pkg/cache/prefix_test.go @@ -0,0 +1,130 @@ +package cache + +import ( + "testing" + + "github.com/Siddhant-K-code/distill/pkg/types" +) + +func makeChunk(id, text string, cacheControl interface{}) types.Chunk { + c := types.Chunk{ID: id, Text: text, Metadata: map[string]interface{}{}} + if cacheControl != nil { + c.Metadata["cache_control"] = cacheControl + } + return c +} + +func TestPartitionForCacheAwareDedup_NoMarkers(t *testing.T) { + chunks := []types.Chunk{ + makeChunk("1", "system prompt", nil), + makeChunk("2", "user message", nil), + makeChunk("3", "assistant reply", nil), + } + p := PartitionForCacheAwareDedup(chunks) + + if len(p.Prefix) != 0 { + t.Errorf("expected empty prefix, got %d chunks", len(p.Prefix)) + } + if len(p.Suffix) != 3 { + t.Errorf("expected 3-chunk suffix, got %d", len(p.Suffix)) + } + if p.FrozenPrefixTokens != 0 { + t.Errorf("expected 0 frozen tokens, got %d", p.FrozenPrefixTokens) + } +} + +func TestPartitionForCacheAwareDedup_SingleMarker(t *testing.T) { + chunks := []types.Chunk{ + makeChunk("1", "You are a helpful assistant.", map[string]interface{}{"type": "ephemeral"}), + makeChunk("2", "What is the capital of France?", nil), + makeChunk("3", "Paris is the capital.", nil), + } + p := PartitionForCacheAwareDedup(chunks) + + if len(p.Prefix) != 1 { + t.Errorf("expected 1-chunk prefix, got %d", len(p.Prefix)) + } + if p.Prefix[0].ID != "1" { + t.Errorf("expected prefix chunk id=1, got %s", p.Prefix[0].ID) + } + if len(p.Suffix) != 2 { + t.Errorf("expected 2-chunk suffix, got %d", len(p.Suffix)) + } + if p.MarkerCount != 1 { + t.Errorf("expected 1 marker, got %d", p.MarkerCount) + } + if p.PrefixHash == "" { + t.Error("expected non-empty PrefixHash") + } + if p.FrozenPrefixTokens <= 0 { + t.Error("expected positive FrozenPrefixTokens") + } +} + +func TestPartitionForCacheAwareDedup_MultipleMarkers(t *testing.T) { + chunks := []types.Chunk{ + makeChunk("1", "System prompt text here.", map[string]interface{}{"type": "ephemeral"}), + makeChunk("2", "Tool definitions JSON schema.", map[string]interface{}{"type": "ephemeral"}), + makeChunk("3", "Dynamic user message.", nil), + makeChunk("4", "Another dynamic message.", nil), + } + p := PartitionForCacheAwareDedup(chunks) + + // Prefix should include everything up to and including the last marker (chunk 2). + if len(p.Prefix) != 2 { + t.Errorf("expected 2-chunk prefix, got %d", len(p.Prefix)) + } + if len(p.Suffix) != 2 { + t.Errorf("expected 2-chunk suffix, got %d", len(p.Suffix)) + } + if p.MarkerCount != 2 { + t.Errorf("expected 2 markers, got %d", p.MarkerCount) + } +} + +func TestPartitionForCacheAwareDedup_MarkerAtEnd(t *testing.T) { + chunks := []types.Chunk{ + makeChunk("1", "chunk one", nil), + makeChunk("2", "chunk two", nil), + makeChunk("3", "chunk three", "ephemeral"), + } + p := PartitionForCacheAwareDedup(chunks) + + // All chunks are in the prefix; suffix is empty. + if len(p.Prefix) != 3 { + t.Errorf("expected 3-chunk prefix, got %d", len(p.Prefix)) + } + if len(p.Suffix) != 0 { + t.Errorf("expected empty suffix, got %d chunks", len(p.Suffix)) + } +} + +func TestPartitionForCacheAwareDedup_HashStability(t *testing.T) { + chunks := []types.Chunk{ + makeChunk("1", "stable system prompt", map[string]interface{}{"type": "ephemeral"}), + makeChunk("2", "dynamic content", nil), + } + + p1 := PartitionForCacheAwareDedup(chunks) + p2 := PartitionForCacheAwareDedup(chunks) + + if p1.PrefixHash != p2.PrefixHash { + t.Errorf("hash not stable across calls: %s != %s", p1.PrefixHash, p2.PrefixHash) + } +} + +func TestPartitionForCacheAwareDedup_HashChangesOnTextChange(t *testing.T) { + chunks1 := []types.Chunk{ + makeChunk("1", "system prompt v1", map[string]interface{}{"type": "ephemeral"}), + } + chunks2 := []types.Chunk{ + makeChunk("1", "system prompt v2", map[string]interface{}{"type": "ephemeral"}), + } + + p1 := PartitionForCacheAwareDedup(chunks1) + p2 := PartitionForCacheAwareDedup(chunks2) + + if p1.PrefixHash == p2.PrefixHash { + t.Error("expected different hashes for different prefix text") + } +} From ef8bab61e676e1435ec9b6b4573fa5a631313ed4 Mon Sep 17 00:00:00 2001 From: Siddhant Khare Date: Sat, 2 May 2026 13:11:11 +0000 Subject: [PATCH 2/3] feat(cache): prefix stability validator (#48) Add StabilityValidator to pkg/cache. Tracks prefix hashes per call site across requests and reports StabilityIssue when the rate drops below UnstableThreshold (default 0.8). Includes: - Runtime Check(callSite, chunks): records hash, detects changes after WarmupChecks (default 3), diagnoses likely cause from DynamicPatterns - Static ValidateText(text): one-shot scan for dynamic interpolation patterns (request id, timestamp, uuid, random, etc.) - Stats/AllStats/Reset for observability Co-authored-by: Ona --- README.md | 54 +++++++ pkg/cache/stability.go | 282 ++++++++++++++++++++++++++++++++++++ pkg/cache/stability_test.go | 154 ++++++++++++++++++++ 3 files changed, 490 insertions(+) create mode 100644 pkg/cache/stability.go create mode 100644 pkg/cache/stability_test.go diff --git a/README.md b/README.md index 506b5b4..5ae6bbc 100644 --- a/README.md +++ b/README.md @@ -735,8 +735,60 @@ KV cache for repeated context patterns (system prompts, tool definitions, boiler - **MemoryCache** - In-memory LRU with TTL, configurable size limits (entries and bytes), background cleanup - **PatternDetector** - Identifies cacheable content and emits `CacheAnnotation` per chunk. Use `AnnotateChunksForCache` to get a `CacheControlPlan` — up to 4 `cache_control` markers (Anthropic's limit) placed at the highest-token-count stable chunks. Auto-placement is skipped when the caller has already set markers manually. +- **PrefixPartition** - Splits a chunk slice into a frozen cache prefix and a dedup-eligible suffix. Used by the `preserve_cache_prefix` dedup option to prevent Distill from reordering chunks that appear before a `cache_control` breakpoint. +- **StabilityValidator** - Tracks prefix hashes across requests and detects dynamic content bleeding into cached prefixes. Reports instability with a likely cause and supports static text analysis for pre-flight checks. - **RedisCache** - Interface for distributed deployments (requires external Redis) +#### Cache-aware dedup (`preserve_cache_prefix`) + +Distill's dedup pipeline can reorder chunks to improve context quality. When prompt caching is active, reordering chunks before the `cache_control` breakpoint changes the prefix hash and causes a cache miss. Use `preserve_cache_prefix` to freeze the prefix: + +```json +POST /v1/dedupe +{ + "chunks": [ + {"id": "sys", "text": "You are a helpful assistant.", "cache_control": "ephemeral"}, + {"id": "tool1", "text": "Tool schema JSON...", "cache_control": "ephemeral"}, + {"id": "msg1", "text": "What is the capital of France?"}, + {"id": "msg2", "text": "What is the capital of Germany?"} + ], + "options": {"preserve_cache_prefix": true} +} +``` + +Response stats when prefix is frozen: + +```json +{ + "stats": { + "input_count": 4, "output_count": 3, + "cache_prefix_frozen": true, + "cache_prefix_tokens": 320, + "cache_prefix_hash": "a3f2c1d4e5b6", + "suffix_input_count": 2, + "suffix_output_count": 1 + } +} +``` + +#### Prefix stability validator + +Detects dynamic content (timestamps, request IDs, UUIDs) bleeding into cached prefixes — the most common cause of 0% cache hit rates: + +```go +validator := cache.NewStabilityValidator(cache.DefaultStabilityConfig()) + +// Runtime check — call on every request +issues := validator.Check("agent/planner.go:84", chunks) +for _, issue := range issues { + log.Warnf("%s", issue) // "cache-prefix-unstable: stability=12% — likely dynamic interpolation: request id" +} + +// Static pre-flight check +found := validator.ValidateText(systemPromptText) +// found = ["request id", "timestamp"] if dynamic patterns detected +``` + #### Automatic cache_control placement ```go @@ -820,6 +872,8 @@ Distill is evolving from a dedup utility into a context intelligence layer. Here | **Session-aware cache boundary manager** | [#51](https://github.com/Siddhant-K-code/distill/issues/51) | Shipped | Auto-advances `cache_control` placement as sessions grow. Stable entries (present ≥ 2 turns unmodified) are included in the cached prefix; boundary retreats when content changes. | | **Cache write cost accounting** | [#52](https://github.com/Siddhant-K-code/distill/issues/52) | Shipped | 9 new Prometheus metrics covering Anthropic prompt cache token usage, hit rate, write efficiency, and boundary position. Feed API response usage via `RecordCacheUsage`. | | **Memory decay lifecycle events** | [#54](https://github.com/Siddhant-K-code/distill/issues/54) | Shipped | `DecayWorker` emits `EventCompressed` and `EventEvicted` on each transition. `RecallResult` includes a `CacheBoundaryHint` for high-relevance entries. | +| **Cache-aware dedup** | [#50](https://github.com/Siddhant-K-code/distill/issues/50) | Shipped | `preserve_cache_prefix` option freezes chunks before the last `cache_control` marker so dedup cannot reorder them. Prefix hash and token count reported in stats. | +| **Prefix stability validator** | [#48](https://github.com/Siddhant-K-code/distill/issues/48) | Shipped | `StabilityValidator` tracks prefix hashes across requests and detects dynamic content (timestamps, request IDs, UUIDs) bleeding into cached prefixes. | ### Code Intelligence diff --git a/pkg/cache/stability.go b/pkg/cache/stability.go new file mode 100644 index 0000000..055af4e --- /dev/null +++ b/pkg/cache/stability.go @@ -0,0 +1,282 @@ +package cache + +import ( + "fmt" + "strings" + "sync" + "time" + + "github.com/Siddhant-K-code/distill/pkg/types" +) + +// StabilityRecord tracks prefix hash observations for a single call site. +type StabilityRecord struct { + CallSite string + Hashes []string + FirstSeen time.Time + LastSeen time.Time + TotalChecks int + Changes int +} + +// StabilityRate returns the fraction of checks where the prefix was unchanged +// (1.0 = perfectly stable, 0.0 = changes every request). +func (r *StabilityRecord) StabilityRate() float64 { + if r.TotalChecks <= 1 { + return 1.0 + } + return 1.0 - float64(r.Changes)/float64(r.TotalChecks-1) +} + +// StabilityIssue describes a detected prefix instability. +type StabilityIssue struct { + // CallSite is the identifier provided by the caller (e.g. "file:line"). + CallSite string + + // StabilityRate is the fraction of requests where the prefix was unchanged. + StabilityRate float64 + + // TotalChecks is the number of requests observed. + TotalChecks int + + // Changes is the number of times the prefix hash changed. + Changes int + + // PreviousHash is the last stable prefix hash. + PreviousHash string + + // CurrentHash is the new (changed) prefix hash. + CurrentHash string + + // Diff is a short human-readable description of what changed. + Diff string + + // LikelyCause is a heuristic explanation of the instability. + LikelyCause string +} + +func (i StabilityIssue) Error() string { + return fmt.Sprintf("cache-prefix-unstable %s: stability=%.0f%% (%d/%d changes) — %s", + i.CallSite, i.StabilityRate*100, i.Changes, i.TotalChecks, i.LikelyCause) +} + +// StabilityValidator tracks prefix hashes across requests and detects +// dynamic content bleeding into cached prefixes. +// +// Usage: +// +// validator := cache.NewStabilityValidator(cache.DefaultStabilityConfig()) +// issues := validator.Check("agent/planner.go:84", chunks) +// if len(issues) > 0 { +// log.Warn(issues[0]) +// } +type StabilityValidator struct { + cfg StabilityConfig + mu sync.Mutex + records map[string]*StabilityRecord +} + +// StabilityConfig controls the validator's sensitivity. +type StabilityConfig struct { + // WarmupChecks is the number of requests to observe before reporting + // issues. Avoids false positives on the first few requests. + // Default: 3. + WarmupChecks int + + // UnstableThreshold is the stability rate below which an issue is + // reported. Default: 0.8 (report if prefix changes > 20% of requests). + UnstableThreshold float64 + + // MaxHashHistory is the maximum number of hashes to retain per call site. + // Default: 100. + MaxHashHistory int + + // DynamicPatterns is a list of substrings that, when found in the prefix + // text, are flagged as likely causes of instability. + DynamicPatterns []string +} + +// DefaultStabilityConfig returns sensible defaults. +func DefaultStabilityConfig() StabilityConfig { + return StabilityConfig{ + WarmupChecks: 3, + UnstableThreshold: 0.8, + MaxHashHistory: 100, + DynamicPatterns: []string{ + "request_id", "requestid", "request-id", "request id", + "timestamp", "datetime", "time.now", "date.now", + "uuid", "random", "rand.", + "user_id", "userid", "user-id", + "session_id", "sessionid", + "nonce", "token:", + }, + } +} + +// NewStabilityValidator creates a new validator. +func NewStabilityValidator(cfg StabilityConfig) *StabilityValidator { + if cfg.WarmupChecks <= 0 { + cfg.WarmupChecks = 3 + } + if cfg.UnstableThreshold <= 0 { + cfg.UnstableThreshold = 0.8 + } + if cfg.MaxHashHistory <= 0 { + cfg.MaxHashHistory = 100 + } + if len(cfg.DynamicPatterns) == 0 { + cfg.DynamicPatterns = DefaultStabilityConfig().DynamicPatterns + } + return &StabilityValidator{ + cfg: cfg, + records: make(map[string]*StabilityRecord), + } +} + +// Check records the current prefix hash for callSite and returns any +// stability issues detected. Returns nil when the prefix is stable or +// the validator is still in the warmup period. +// +// chunks should be the full message array (including cache_control markers). +// The prefix is extracted automatically using PartitionForCacheAwareDedup. +func (v *StabilityValidator) Check(callSite string, chunks []types.Chunk) []StabilityIssue { + partition := PartitionForCacheAwareDedup(chunks) + + // If no cache_control markers, nothing to validate. + if partition.MarkerCount == 0 { + return nil + } + + currentHash := partition.PrefixHash + prefixText := extractPrefixText(partition.Prefix) + + v.mu.Lock() + defer v.mu.Unlock() + + rec, exists := v.records[callSite] + if !exists { + rec = &StabilityRecord{ + CallSite: callSite, + FirstSeen: time.Now(), + } + v.records[callSite] = rec + } + + rec.LastSeen = time.Now() + rec.TotalChecks++ + + var prevHash string + if len(rec.Hashes) > 0 { + prevHash = rec.Hashes[len(rec.Hashes)-1] + } + + changed := prevHash != "" && prevHash != currentHash + if changed { + rec.Changes++ + } + + // Maintain bounded history. + rec.Hashes = append(rec.Hashes, currentHash) + if len(rec.Hashes) > v.cfg.MaxHashHistory { + rec.Hashes = rec.Hashes[len(rec.Hashes)-v.cfg.MaxHashHistory:] + } + + // Still warming up. + if rec.TotalChecks < v.cfg.WarmupChecks { + return nil + } + + rate := rec.StabilityRate() + if rate >= v.cfg.UnstableThreshold { + return nil + } + + issue := StabilityIssue{ + CallSite: callSite, + StabilityRate: rate, + TotalChecks: rec.TotalChecks, + Changes: rec.Changes, + PreviousHash: prevHash, + CurrentHash: currentHash, + LikelyCause: v.diagnoseCause(prefixText), + } + + if changed && prevHash != "" { + issue.Diff = fmt.Sprintf("prefix hash changed: %s → %s", prevHash[:8], currentHash[:8]) + } + + return []StabilityIssue{issue} +} + +// ValidateText performs a one-shot static analysis of prefix text for +// patterns that commonly cause cache instability. Returns a list of +// suspected dynamic patterns found in the text. +// +// This is useful as a pre-flight check before sending the first request. +func (v *StabilityValidator) ValidateText(prefixText string) []string { + lower := strings.ToLower(prefixText) + var found []string + seen := map[string]bool{} + for _, pattern := range v.cfg.DynamicPatterns { + if !seen[pattern] && strings.Contains(lower, pattern) { + found = append(found, pattern) + seen[pattern] = true + } + } + return found +} + +// Stats returns the current stability record for a call site, or nil if +// no observations have been recorded. +func (v *StabilityValidator) Stats(callSite string) *StabilityRecord { + v.mu.Lock() + defer v.mu.Unlock() + r := v.records[callSite] + if r == nil { + return nil + } + // Return a copy to avoid data races. + cp := *r + cp.Hashes = append([]string(nil), r.Hashes...) + return &cp +} + +// AllStats returns stability records for all observed call sites. +func (v *StabilityValidator) AllStats() []*StabilityRecord { + v.mu.Lock() + defer v.mu.Unlock() + out := make([]*StabilityRecord, 0, len(v.records)) + for _, r := range v.records { + cp := *r + cp.Hashes = append([]string(nil), r.Hashes...) + out = append(out, &cp) + } + return out +} + +// Reset clears all recorded observations for a call site. +func (v *StabilityValidator) Reset(callSite string) { + v.mu.Lock() + defer v.mu.Unlock() + delete(v.records, callSite) +} + +// diagnoseCause scans prefix text for known dynamic patterns and returns a +// human-readable explanation. +func (v *StabilityValidator) diagnoseCause(text string) string { + found := v.ValidateText(text) + if len(found) == 0 { + return "unknown — prefix content changes between requests" + } + return fmt.Sprintf("likely dynamic interpolation: %s", strings.Join(found, ", ")) +} + +// extractPrefixText concatenates the text of all prefix chunks. +func extractPrefixText(chunks []types.Chunk) string { + var sb strings.Builder + for _, c := range chunks { + sb.WriteString(c.Text) + sb.WriteByte('\n') + } + return sb.String() +} diff --git a/pkg/cache/stability_test.go b/pkg/cache/stability_test.go new file mode 100644 index 0000000..62f8e97 --- /dev/null +++ b/pkg/cache/stability_test.go @@ -0,0 +1,154 @@ +package cache + +import ( + "fmt" + "testing" + + "github.com/Siddhant-K-code/distill/pkg/types" +) + +func stableChunks(systemPrompt string) []types.Chunk { + return []types.Chunk{ + { + ID: "sys", + Text: systemPrompt, + Metadata: map[string]interface{}{ + "cache_control": map[string]interface{}{"type": "ephemeral"}, + }, + }, + {ID: "user", Text: "What is 2+2?"}, + } +} + +func TestStabilityValidator_StablePrefix(t *testing.T) { + v := NewStabilityValidator(StabilityConfig{ + WarmupChecks: 2, + UnstableThreshold: 0.8, + MaxHashHistory: 10, + DynamicPatterns: DefaultStabilityConfig().DynamicPatterns, + }) + + chunks := stableChunks("You are a helpful assistant.") + + // Warmup period — no issues expected. + for i := 0; i < 5; i++ { + issues := v.Check("agent.go:42", chunks) + if len(issues) > 0 { + t.Errorf("turn %d: expected no issues for stable prefix, got %v", i, issues) + } + } + + stats := v.Stats("agent.go:42") + if stats == nil { + t.Fatal("expected stats, got nil") + } + if stats.StabilityRate() != 1.0 { + t.Errorf("expected 1.0 stability rate, got %f", stats.StabilityRate()) + } +} + +func TestStabilityValidator_UnstablePrefix(t *testing.T) { + v := NewStabilityValidator(StabilityConfig{ + WarmupChecks: 2, + UnstableThreshold: 0.8, + MaxHashHistory: 10, + DynamicPatterns: DefaultStabilityConfig().DynamicPatterns, + }) + + // Each call uses a different request ID in the prefix — simulates dynamic content. + for i := 0; i < 6; i++ { + chunks := stableChunks(fmt.Sprintf("You are helpful. Request ID: req-%d", i)) + issues := v.Check("reviewer.go:201", chunks) + if i >= 2 && len(issues) > 0 { + // Found instability — test passes. + return + } + } + t.Error("expected instability to be detected after warmup, but no issues were reported") +} + +func TestStabilityValidator_WarmupPeriod(t *testing.T) { + v := NewStabilityValidator(StabilityConfig{ + WarmupChecks: 5, + UnstableThreshold: 0.5, + MaxHashHistory: 10, + DynamicPatterns: DefaultStabilityConfig().DynamicPatterns, + }) + + // Even with changing prefixes, no issues during warmup. + for i := 0; i < 4; i++ { + chunks := stableChunks(fmt.Sprintf("Prompt version %d", i)) + issues := v.Check("planner.go:84", chunks) + if len(issues) > 0 { + t.Errorf("turn %d: expected no issues during warmup, got %v", i, issues) + } + } +} + +func TestStabilityValidator_NoMarkers(t *testing.T) { + v := NewStabilityValidator(DefaultStabilityConfig()) + + // Chunks without cache_control markers — nothing to validate. + chunks := []types.Chunk{ + {ID: "1", Text: "no marker here"}, + {ID: "2", Text: "also no marker"}, + } + issues := v.Check("anywhere.go:1", chunks) + if len(issues) > 0 { + t.Errorf("expected no issues for chunks without markers, got %v", issues) + } +} + +func TestStabilityValidator_ValidateText_DynamicPatterns(t *testing.T) { + v := NewStabilityValidator(DefaultStabilityConfig()) + + tests := []struct { + text string + wantHit bool + }{ + {"You are a helpful assistant.", false}, + {"Request ID: abc-123 is included here.", true}, + {"Current timestamp: 2026-05-02T10:00:00Z", true}, + {"User UUID: 550e8400-e29b-41d4-a716-446655440000", true}, + {"Static system prompt with no dynamic fields.", false}, + } + + for _, tt := range tests { + found := v.ValidateText(tt.text) + if tt.wantHit && len(found) == 0 { + t.Errorf("expected dynamic pattern in %q, found none", tt.text) + } + if !tt.wantHit && len(found) > 0 { + t.Errorf("unexpected dynamic pattern in %q: %v", tt.text, found) + } + } +} + +func TestStabilityValidator_Reset(t *testing.T) { + v := NewStabilityValidator(DefaultStabilityConfig()) + chunks := stableChunks("static prompt") + + for i := 0; i < 3; i++ { + v.Check("file.go:1", chunks) + } + if v.Stats("file.go:1") == nil { + t.Fatal("expected stats before reset") + } + + v.Reset("file.go:1") + if v.Stats("file.go:1") != nil { + t.Error("expected nil stats after reset") + } +} + +func TestStabilityValidator_AllStats(t *testing.T) { + v := NewStabilityValidator(DefaultStabilityConfig()) + + v.Check("a.go:1", stableChunks("prompt a")) + v.Check("b.go:2", stableChunks("prompt b")) + + all := v.AllStats() + if len(all) != 2 { + t.Errorf("expected 2 records, got %d", len(all)) + } +} From 67f26983b53e5317fe9a1073d41f30060c029108 Mon Sep 17 00:00:00 2001 From: Siddhant Khare Date: Sat, 2 May 2026 14:01:33 +0000 Subject: [PATCH 3/3] fix(cache): apply preserve_cache_prefix to handleDedupeStream (#50) handleDedupeStream was missing the partition logic added to handleDedupe. Both handlers now freeze the cache prefix, run dedup only on the suffix, and report cache_prefix_* fields in stats. Co-authored-by: Ona --- cmd/api.go | 57 +++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 39 insertions(+), 18 deletions(-) diff --git a/cmd/api.go b/cmd/api.go index 1e9f7d8..23d7c9d 100644 --- a/cmd/api.go +++ b/cmd/api.go @@ -558,23 +558,34 @@ func (s *APIServer) handleDedupeStream(w http.ResponseWriter, r *http.Request) { start := time.Now() - // Convert to internal types + // Convert to internal types, preserving cache_control metadata. chunks := make([]types.Chunk, len(req.Chunks)) needsEmbedding := false - for i, c := range req.Chunks { chunks[i] = types.Chunk{ ID: c.ID, Text: c.Text, Embedding: c.Embedding, Score: c.Score, + Metadata: make(map[string]interface{}), + } + if c.CacheControl != "" { + chunks[i].Metadata["cache_control"] = c.CacheControl } if len(c.Embedding) == 0 { needsEmbedding = true } } - // Stage 1: Embedding + // Partition into frozen prefix + dedup-eligible suffix when requested. + var partition distillcache.PrefixPartition + dedupChunks := chunks + if req.Options.PreserveCachePrefix { + partition = distillcache.PartitionForCacheAwareDedup(chunks) + dedupChunks = partition.Suffix + } + + // Stage 1: Embedding (suffix only). if needsEmbedding { if s.embedder == nil { _ = sw.SendError(sse.StageEmbedding, "Embeddings required but no embedding provider configured. Either provide embeddings in request or configure OPENAI_API_KEY.") @@ -583,9 +594,9 @@ func (s *APIServer) handleDedupeStream(w http.ResponseWriter, r *http.Request) { _ = sw.SendProgress(sse.StageEmbedding, 0) - _, embSpan := s.tracing.StartEmbedding(ctx, len(chunks)) - texts := make([]string, len(chunks)) - for i, c := range chunks { + _, embSpan := s.tracing.StartEmbedding(ctx, len(dedupChunks)) + texts := make([]string, len(dedupChunks)) + for i, c := range dedupChunks { texts[i] = c.Text } @@ -598,8 +609,8 @@ func (s *APIServer) handleDedupeStream(w http.ResponseWriter, r *http.Request) { } embSpan.End() - for i := range chunks { - chunks[i].Embedding = embeddings[i] + for i := range dedupChunks { + dedupChunks[i].Embedding = embeddings[i] } _ = sw.SendProgress(sse.StageEmbedding, 1.0) @@ -619,20 +630,20 @@ func (s *APIServer) handleDedupeStream(w http.ResponseWriter, r *http.Request) { targetK = 0 } - // Stage 2: Clustering + // Stage 2: Clustering (suffix only). _ = sw.SendProgress(sse.StageClustering, 0) - _, clusterSpan := s.tracing.StartClustering(ctx, len(chunks), threshold) + _, clusterSpan := s.tracing.StartClustering(ctx, len(dedupChunks), threshold) clusterer := contextlab.NewClusterer(contextlab.ClusterConfig{ Threshold: threshold, Linkage: "average", }) - clusterResult := clusterer.Cluster(chunks) + clusterResult := clusterer.Cluster(dedupChunks) clusterSpan.End() _ = sw.SendProgressWithStats(sse.StageClustering, 1.0, map[string]interface{}{ "clusters_formed": clusterResult.ClusterCount, - "input_count": len(chunks), + "input_count": len(dedupChunks), }) // Stage 3: Selection @@ -667,13 +678,16 @@ func (s *APIServer) handleDedupeStream(w http.ResponseWriter, r *http.Request) { }) } + // Prepend frozen prefix to deduped suffix. + finalChunks := append(partition.Prefix, representatives...) + latency := time.Since(start) - telemetry.RecordResult(rootSpan, len(req.Chunks), len(representatives), clusterResult.ClusterCount, latency) + telemetry.RecordResult(rootSpan, len(req.Chunks), len(finalChunks), clusterResult.ClusterCount, latency) // Build response chunks - outputChunks := make([]DedupeChunkResponse, len(representatives)) - for i, c := range representatives { + outputChunks := make([]DedupeChunkResponse, len(finalChunks)) + for i, c := range finalChunks { outputChunks[i] = DedupeChunkResponse{ ID: c.ID, Text: c.Text, @@ -684,18 +698,25 @@ func (s *APIServer) handleDedupeStream(w http.ResponseWriter, r *http.Request) { reductionPct := 0 if len(req.Chunks) > 0 { - reductionPct = int((1 - float64(len(representatives))/float64(len(req.Chunks))) * 100) + reductionPct = int((1 - float64(len(finalChunks))/float64(len(req.Chunks))) * 100) } stats := DedupeStats{ InputCount: len(req.Chunks), - OutputCount: len(representatives), + OutputCount: len(finalChunks), ClusterCount: clusterResult.ClusterCount, ReductionPct: reductionPct, LatencyMs: latency.Milliseconds(), } + if req.Options.PreserveCachePrefix && partition.MarkerCount > 0 { + stats.CachePrefixFrozen = true + stats.CachePrefixTokens = partition.FrozenPrefixTokens + stats.CachePrefixHash = partition.PrefixHash + stats.SuffixInputCount = len(partition.Suffix) + stats.SuffixOutputCount = len(representatives) + } - s.metrics.RecordDedup("/v1/dedupe/stream", len(req.Chunks), len(representatives), clusterResult.ClusterCount) + s.metrics.RecordDedup("/v1/dedupe/stream", len(req.Chunks), len(finalChunks), clusterResult.ClusterCount) // Send final complete event _ = sw.SendComplete(outputChunks, stats)