Skip to content

Commit 0917f08

Browse files
feat(cache): cache-aware dedup and prefix stability validator (#59)
Closes #50 Closes #48 preserve_cache_prefix option freezes chunks before the last cache_control marker so the dedup pipeline cannot reorder them. Applied to both handleDedupe and handleDedupeStream. StabilityValidator tracks prefix hashes per call site, reports instability when rate drops below threshold, and provides static ValidateText for pre-flight pattern scanning. Co-authored-by: Ona <no-reply@ona.com>
1 parent c172c7b commit 0917f08

6 files changed

Lines changed: 858 additions & 45 deletions

File tree

README.md

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -735,8 +735,60 @@ KV cache for repeated context patterns (system prompts, tool definitions, boiler
735735

736736
- **MemoryCache** - In-memory LRU with TTL, configurable size limits (entries and bytes), background cleanup
737737
- **PatternDetector** - Identifies cacheable content and emits `CacheAnnotation` per chunk. Use `AnnotateChunksForCache` to get a `CacheControlPlan` — up to 4 `cache_control` markers (Anthropic's limit) placed at the highest-token-count stable chunks. Auto-placement is skipped when the caller has already set markers manually.
738+
- **PrefixPartition** - Splits a chunk slice into a frozen cache prefix and a dedup-eligible suffix. Used by the `preserve_cache_prefix` dedup option to prevent Distill from reordering chunks that appear before a `cache_control` breakpoint.
739+
- **StabilityValidator** - Tracks prefix hashes across requests and detects dynamic content bleeding into cached prefixes. Reports instability with a likely cause and supports static text analysis for pre-flight checks.
738740
- **RedisCache** - Interface for distributed deployments (requires external Redis)
739741

742+
#### Cache-aware dedup (`preserve_cache_prefix`)
743+
744+
Distill's dedup pipeline can reorder chunks to improve context quality. When prompt caching is active, reordering chunks before the `cache_control` breakpoint changes the prefix hash and causes a cache miss. Use `preserve_cache_prefix` to freeze the prefix:
745+
746+
```json
747+
POST /v1/dedupe
748+
{
749+
"chunks": [
750+
{"id": "sys", "text": "You are a helpful assistant.", "cache_control": "ephemeral"},
751+
{"id": "tool1", "text": "Tool schema JSON...", "cache_control": "ephemeral"},
752+
{"id": "msg1", "text": "What is the capital of France?"},
753+
{"id": "msg2", "text": "What is the capital of Germany?"}
754+
],
755+
"options": {"preserve_cache_prefix": true}
756+
}
757+
```
758+
759+
Response stats when prefix is frozen:
760+
761+
```json
762+
{
763+
"stats": {
764+
"input_count": 4, "output_count": 3,
765+
"cache_prefix_frozen": true,
766+
"cache_prefix_tokens": 320,
767+
"cache_prefix_hash": "a3f2c1d4e5b6",
768+
"suffix_input_count": 2,
769+
"suffix_output_count": 1
770+
}
771+
}
772+
```
773+
774+
#### Prefix stability validator
775+
776+
Detects dynamic content (timestamps, request IDs, UUIDs) bleeding into cached prefixes — the most common cause of 0% cache hit rates:
777+
778+
```go
779+
validator := cache.NewStabilityValidator(cache.DefaultStabilityConfig())
780+
781+
// Runtime check — call on every request
782+
issues := validator.Check("agent/planner.go:84", chunks)
783+
for _, issue := range issues {
784+
log.Warnf("%s", issue) // "cache-prefix-unstable: stability=12% — likely dynamic interpolation: request id"
785+
}
786+
787+
// Static pre-flight check
788+
found := validator.ValidateText(systemPromptText)
789+
// found = ["request id", "timestamp"] if dynamic patterns detected
790+
```
791+
740792
#### Automatic cache_control placement
741793

742794
```go
@@ -820,6 +872,8 @@ Distill is evolving from a dedup utility into a context intelligence layer. Here
820872
| **Session-aware cache boundary manager** | [#51](https://github.com/Siddhant-K-code/distill/issues/51) | Shipped | Auto-advances `cache_control` placement as sessions grow. Stable entries (present ≥ 2 turns unmodified) are included in the cached prefix; boundary retreats when content changes. |
821873
| **Cache write cost accounting** | [#52](https://github.com/Siddhant-K-code/distill/issues/52) | Shipped | 9 new Prometheus metrics covering Anthropic prompt cache token usage, hit rate, write efficiency, and boundary position. Feed API response usage via `RecordCacheUsage`. |
822874
| **Memory decay lifecycle events** | [#54](https://github.com/Siddhant-K-code/distill/issues/54) | Shipped | `DecayWorker` emits `EventCompressed` and `EventEvicted` on each transition. `RecallResult` includes a `CacheBoundaryHint` for high-relevance entries. |
875+
| **Cache-aware dedup** | [#50](https://github.com/Siddhant-K-code/distill/issues/50) | Shipped | `preserve_cache_prefix` option freezes chunks before the last `cache_control` marker so dedup cannot reorder them. Prefix hash and token count reported in stats. |
876+
| **Prefix stability validator** | [#48](https://github.com/Siddhant-K-code/distill/issues/48) | Shipped | `StabilityValidator` tracks prefix hashes across requests and detects dynamic content (timestamps, request IDs, UUIDs) bleeding into cached prefixes. |
823877
824878
### Code Intelligence
825879

cmd/api.go

Lines changed: 110 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"syscall"
1212
"time"
1313

14+
distillcache "github.com/Siddhant-K-code/distill/pkg/cache"
1415
"github.com/Siddhant-K-code/distill/pkg/contextlab"
1516
"github.com/Siddhant-K-code/distill/pkg/embedding/openai"
1617
"github.com/Siddhant-K-code/distill/pkg/metrics"
@@ -61,14 +62,27 @@ type DedupeRequest struct {
6162
Threshold float64 `json:"threshold,omitempty"`
6263
Lambda float64 `json:"lambda,omitempty"`
6364
TargetK int `json:"target_k,omitempty"`
65+
Options DedupeOptions `json:"options,omitempty"`
66+
}
67+
68+
// DedupeOptions controls optional dedup behaviour.
69+
type DedupeOptions struct {
70+
// PreserveCachePrefix freezes chunks before the last cache_control marker
71+
// so the dedup pipeline cannot reorder or remove them. This prevents
72+
// Distill from silently invalidating Anthropic prompt cache prefixes.
73+
PreserveCachePrefix bool `json:"preserve_cache_prefix,omitempty"`
6474
}
6575

6676
// DedupeChunk represents a chunk in the request.
6777
type DedupeChunk struct {
68-
ID string `json:"id"`
69-
Text string `json:"text"`
70-
Embedding []float32 `json:"embedding,omitempty"`
71-
Score float32 `json:"score,omitempty"`
78+
ID string `json:"id"`
79+
Text string `json:"text"`
80+
Embedding []float32 `json:"embedding,omitempty"`
81+
Score float32 `json:"score,omitempty"`
82+
// CacheControl mirrors the Anthropic cache_control field. When non-empty,
83+
// this chunk is treated as a cache boundary marker. Used with
84+
// options.preserve_cache_prefix to freeze the prefix during dedup.
85+
CacheControl string `json:"cache_control,omitempty"`
7286
}
7387

7488
// DedupeResponse is the JSON response for /v1/dedupe.
@@ -92,6 +106,13 @@ type DedupeStats struct {
92106
ClusterCount int `json:"cluster_count"`
93107
ReductionPct int `json:"reduction_pct"`
94108
LatencyMs int64 `json:"latency_ms"`
109+
110+
// Cache prefix fields — populated when options.preserve_cache_prefix=true.
111+
CachePrefixFrozen bool `json:"cache_prefix_frozen,omitempty"`
112+
CachePrefixTokens int `json:"cache_prefix_tokens,omitempty"`
113+
CachePrefixHash string `json:"cache_prefix_hash,omitempty"`
114+
SuffixInputCount int `json:"suffix_input_count,omitempty"`
115+
SuffixOutputCount int `json:"suffix_output_count,omitempty"`
95116
}
96117

97118
// APIServer holds the API server state.
@@ -346,32 +367,43 @@ func (s *APIServer) handleDedupe(w http.ResponseWriter, r *http.Request) {
346367

347368
start := time.Now()
348369

349-
// Convert to internal types
370+
// Convert to internal types, preserving cache_control metadata.
350371
chunks := make([]types.Chunk, len(req.Chunks))
351372
needsEmbedding := false
352-
353373
for i, c := range req.Chunks {
354374
chunks[i] = types.Chunk{
355375
ID: c.ID,
356376
Text: c.Text,
357377
Embedding: c.Embedding,
358378
Score: c.Score,
379+
Metadata: make(map[string]interface{}),
380+
}
381+
if c.CacheControl != "" {
382+
chunks[i].Metadata["cache_control"] = c.CacheControl
359383
}
360384
if len(c.Embedding) == 0 {
361385
needsEmbedding = true
362386
}
363387
}
364388

365-
// Generate embeddings if needed
389+
// Partition into frozen prefix + dedup-eligible suffix when requested.
390+
var partition distillcache.PrefixPartition
391+
dedupChunks := chunks
392+
if req.Options.PreserveCachePrefix {
393+
partition = distillcache.PartitionForCacheAwareDedup(chunks)
394+
dedupChunks = partition.Suffix
395+
}
396+
397+
// Generate embeddings if needed (only for the dedup-eligible suffix).
366398
if needsEmbedding {
367399
if s.embedder == nil {
368400
http.Error(w, "Embeddings required but no embedding provider configured. Either provide embeddings in request or configure OPENAI_API_KEY.", http.StatusBadRequest)
369401
return
370402
}
371403

372-
_, embSpan := s.tracing.StartEmbedding(ctx, len(chunks))
373-
texts := make([]string, len(chunks))
374-
for i, c := range chunks {
404+
_, embSpan := s.tracing.StartEmbedding(ctx, len(dedupChunks))
405+
texts := make([]string, len(dedupChunks))
406+
for i, c := range dedupChunks {
375407
texts[i] = c.Text
376408
}
377409

@@ -384,8 +416,8 @@ func (s *APIServer) handleDedupe(w http.ResponseWriter, r *http.Request) {
384416
}
385417
embSpan.End()
386418

387-
for i := range chunks {
388-
chunks[i].Embedding = embeddings[i]
419+
for i := range dedupChunks {
420+
dedupChunks[i].Embedding = embeddings[i]
389421
}
390422
}
391423

@@ -403,13 +435,13 @@ func (s *APIServer) handleDedupe(w http.ResponseWriter, r *http.Request) {
403435
targetK = 0 // Will be set to cluster count
404436
}
405437

406-
// Cluster
407-
_, clusterSpan := s.tracing.StartClustering(ctx, len(chunks), threshold)
438+
// Cluster the dedup-eligible suffix only.
439+
_, clusterSpan := s.tracing.StartClustering(ctx, len(dedupChunks), threshold)
408440
clusterer := contextlab.NewClusterer(contextlab.ClusterConfig{
409441
Threshold: threshold,
410442
Linkage: "average",
411443
})
412-
clusterResult := clusterer.Cluster(chunks)
444+
clusterResult := clusterer.Cluster(dedupChunks)
413445
clusterSpan.End()
414446

415447
// Select representatives
@@ -432,14 +464,17 @@ func (s *APIServer) handleDedupe(w http.ResponseWriter, r *http.Request) {
432464
mmrSpan.End()
433465
}
434466

467+
// Prepend the frozen prefix to the deduped suffix.
468+
finalChunks := append(partition.Prefix, representatives...)
469+
435470
latency := time.Since(start)
436471

437472
// Record result on root span
438-
telemetry.RecordResult(rootSpan, len(req.Chunks), len(representatives), clusterResult.ClusterCount, latency)
473+
telemetry.RecordResult(rootSpan, len(req.Chunks), len(finalChunks), clusterResult.ClusterCount, latency)
439474

440475
// Build response
441-
outputChunks := make([]DedupeChunkResponse, len(representatives))
442-
for i, c := range representatives {
476+
outputChunks := make([]DedupeChunkResponse, len(finalChunks))
477+
for i, c := range finalChunks {
443478
outputChunks[i] = DedupeChunkResponse{
444479
ID: c.ID,
445480
Text: c.Text,
@@ -450,22 +485,31 @@ func (s *APIServer) handleDedupe(w http.ResponseWriter, r *http.Request) {
450485

451486
reductionPct := 0
452487
if len(req.Chunks) > 0 {
453-
reductionPct = int((1 - float64(len(representatives))/float64(len(req.Chunks))) * 100)
488+
reductionPct = int((1 - float64(len(finalChunks))/float64(len(req.Chunks))) * 100)
489+
}
490+
491+
stats := DedupeStats{
492+
InputCount: len(req.Chunks),
493+
OutputCount: len(finalChunks),
494+
ClusterCount: clusterResult.ClusterCount,
495+
ReductionPct: reductionPct,
496+
LatencyMs: latency.Milliseconds(),
497+
}
498+
if req.Options.PreserveCachePrefix && partition.MarkerCount > 0 {
499+
stats.CachePrefixFrozen = true
500+
stats.CachePrefixTokens = partition.FrozenPrefixTokens
501+
stats.CachePrefixHash = partition.PrefixHash
502+
stats.SuffixInputCount = len(partition.Suffix)
503+
stats.SuffixOutputCount = len(representatives)
454504
}
455505

456506
resp := DedupeResponse{
457507
Chunks: outputChunks,
458-
Stats: DedupeStats{
459-
InputCount: len(req.Chunks),
460-
OutputCount: len(representatives),
461-
ClusterCount: clusterResult.ClusterCount,
462-
ReductionPct: reductionPct,
463-
LatencyMs: latency.Milliseconds(),
464-
},
508+
Stats: stats,
465509
}
466510

467511
// Record dedup-specific metrics
468-
s.metrics.RecordDedup("/v1/dedupe", len(req.Chunks), len(representatives), clusterResult.ClusterCount)
512+
s.metrics.RecordDedup("/v1/dedupe", len(req.Chunks), len(finalChunks), clusterResult.ClusterCount)
469513

470514
w.Header().Set("Content-Type", "application/json")
471515
_ = json.NewEncoder(w).Encode(resp)
@@ -514,23 +558,34 @@ func (s *APIServer) handleDedupeStream(w http.ResponseWriter, r *http.Request) {
514558

515559
start := time.Now()
516560

517-
// Convert to internal types
561+
// Convert to internal types, preserving cache_control metadata.
518562
chunks := make([]types.Chunk, len(req.Chunks))
519563
needsEmbedding := false
520-
521564
for i, c := range req.Chunks {
522565
chunks[i] = types.Chunk{
523566
ID: c.ID,
524567
Text: c.Text,
525568
Embedding: c.Embedding,
526569
Score: c.Score,
570+
Metadata: make(map[string]interface{}),
571+
}
572+
if c.CacheControl != "" {
573+
chunks[i].Metadata["cache_control"] = c.CacheControl
527574
}
528575
if len(c.Embedding) == 0 {
529576
needsEmbedding = true
530577
}
531578
}
532579

533-
// Stage 1: Embedding
580+
// Partition into frozen prefix + dedup-eligible suffix when requested.
581+
var partition distillcache.PrefixPartition
582+
dedupChunks := chunks
583+
if req.Options.PreserveCachePrefix {
584+
partition = distillcache.PartitionForCacheAwareDedup(chunks)
585+
dedupChunks = partition.Suffix
586+
}
587+
588+
// Stage 1: Embedding (suffix only).
534589
if needsEmbedding {
535590
if s.embedder == nil {
536591
_ = sw.SendError(sse.StageEmbedding, "Embeddings required but no embedding provider configured. Either provide embeddings in request or configure OPENAI_API_KEY.")
@@ -539,9 +594,9 @@ func (s *APIServer) handleDedupeStream(w http.ResponseWriter, r *http.Request) {
539594

540595
_ = sw.SendProgress(sse.StageEmbedding, 0)
541596

542-
_, embSpan := s.tracing.StartEmbedding(ctx, len(chunks))
543-
texts := make([]string, len(chunks))
544-
for i, c := range chunks {
597+
_, embSpan := s.tracing.StartEmbedding(ctx, len(dedupChunks))
598+
texts := make([]string, len(dedupChunks))
599+
for i, c := range dedupChunks {
545600
texts[i] = c.Text
546601
}
547602

@@ -554,8 +609,8 @@ func (s *APIServer) handleDedupeStream(w http.ResponseWriter, r *http.Request) {
554609
}
555610
embSpan.End()
556611

557-
for i := range chunks {
558-
chunks[i].Embedding = embeddings[i]
612+
for i := range dedupChunks {
613+
dedupChunks[i].Embedding = embeddings[i]
559614
}
560615

561616
_ = sw.SendProgress(sse.StageEmbedding, 1.0)
@@ -575,20 +630,20 @@ func (s *APIServer) handleDedupeStream(w http.ResponseWriter, r *http.Request) {
575630
targetK = 0
576631
}
577632

578-
// Stage 2: Clustering
633+
// Stage 2: Clustering (suffix only).
579634
_ = sw.SendProgress(sse.StageClustering, 0)
580635

581-
_, clusterSpan := s.tracing.StartClustering(ctx, len(chunks), threshold)
636+
_, clusterSpan := s.tracing.StartClustering(ctx, len(dedupChunks), threshold)
582637
clusterer := contextlab.NewClusterer(contextlab.ClusterConfig{
583638
Threshold: threshold,
584639
Linkage: "average",
585640
})
586-
clusterResult := clusterer.Cluster(chunks)
641+
clusterResult := clusterer.Cluster(dedupChunks)
587642
clusterSpan.End()
588643

589644
_ = sw.SendProgressWithStats(sse.StageClustering, 1.0, map[string]interface{}{
590645
"clusters_formed": clusterResult.ClusterCount,
591-
"input_count": len(chunks),
646+
"input_count": len(dedupChunks),
592647
})
593648

594649
// Stage 3: Selection
@@ -623,13 +678,16 @@ func (s *APIServer) handleDedupeStream(w http.ResponseWriter, r *http.Request) {
623678
})
624679
}
625680

681+
// Prepend frozen prefix to deduped suffix.
682+
finalChunks := append(partition.Prefix, representatives...)
683+
626684
latency := time.Since(start)
627685

628-
telemetry.RecordResult(rootSpan, len(req.Chunks), len(representatives), clusterResult.ClusterCount, latency)
686+
telemetry.RecordResult(rootSpan, len(req.Chunks), len(finalChunks), clusterResult.ClusterCount, latency)
629687

630688
// Build response chunks
631-
outputChunks := make([]DedupeChunkResponse, len(representatives))
632-
for i, c := range representatives {
689+
outputChunks := make([]DedupeChunkResponse, len(finalChunks))
690+
for i, c := range finalChunks {
633691
outputChunks[i] = DedupeChunkResponse{
634692
ID: c.ID,
635693
Text: c.Text,
@@ -640,18 +698,25 @@ func (s *APIServer) handleDedupeStream(w http.ResponseWriter, r *http.Request) {
640698

641699
reductionPct := 0
642700
if len(req.Chunks) > 0 {
643-
reductionPct = int((1 - float64(len(representatives))/float64(len(req.Chunks))) * 100)
701+
reductionPct = int((1 - float64(len(finalChunks))/float64(len(req.Chunks))) * 100)
644702
}
645703

646704
stats := DedupeStats{
647705
InputCount: len(req.Chunks),
648-
OutputCount: len(representatives),
706+
OutputCount: len(finalChunks),
649707
ClusterCount: clusterResult.ClusterCount,
650708
ReductionPct: reductionPct,
651709
LatencyMs: latency.Milliseconds(),
652710
}
711+
if req.Options.PreserveCachePrefix && partition.MarkerCount > 0 {
712+
stats.CachePrefixFrozen = true
713+
stats.CachePrefixTokens = partition.FrozenPrefixTokens
714+
stats.CachePrefixHash = partition.PrefixHash
715+
stats.SuffixInputCount = len(partition.Suffix)
716+
stats.SuffixOutputCount = len(representatives)
717+
}
653718

654-
s.metrics.RecordDedup("/v1/dedupe/stream", len(req.Chunks), len(representatives), clusterResult.ClusterCount)
719+
s.metrics.RecordDedup("/v1/dedupe/stream", len(req.Chunks), len(finalChunks), clusterResult.ClusterCount)
655720

656721
// Send final complete event
657722
_ = sw.SendComplete(outputChunks, stats)

0 commit comments

Comments
 (0)