Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -735,8 +735,60 @@ KV cache for repeated context patterns (system prompts, tool definitions, boiler

- **MemoryCache** - In-memory LRU with TTL, configurable size limits (entries and bytes), background cleanup
- **PatternDetector** - Identifies cacheable content and emits `CacheAnnotation` per chunk. Use `AnnotateChunksForCache` to get a `CacheControlPlan` — up to 4 `cache_control` markers (Anthropic's limit) placed at the highest-token-count stable chunks. Auto-placement is skipped when the caller has already set markers manually.
- **PrefixPartition** - Splits a chunk slice into a frozen cache prefix and a dedup-eligible suffix. Used by the `preserve_cache_prefix` dedup option to prevent Distill from reordering chunks that appear before a `cache_control` breakpoint.
- **StabilityValidator** - Tracks prefix hashes across requests and detects dynamic content bleeding into cached prefixes. Reports instability with a likely cause and supports static text analysis for pre-flight checks.
- **RedisCache** - Interface for distributed deployments (requires external Redis)

#### Cache-aware dedup (`preserve_cache_prefix`)

Distill's dedup pipeline can reorder chunks to improve context quality. When prompt caching is active, reordering chunks before the `cache_control` breakpoint changes the prefix hash and causes a cache miss. Use `preserve_cache_prefix` to freeze the prefix:

```json
POST /v1/dedupe
{
"chunks": [
{"id": "sys", "text": "You are a helpful assistant.", "cache_control": "ephemeral"},
{"id": "tool1", "text": "Tool schema JSON...", "cache_control": "ephemeral"},
{"id": "msg1", "text": "What is the capital of France?"},
{"id": "msg2", "text": "What is the capital of Germany?"}
],
"options": {"preserve_cache_prefix": true}
}
```

Response stats when prefix is frozen:

```json
{
"stats": {
"input_count": 4, "output_count": 3,
"cache_prefix_frozen": true,
"cache_prefix_tokens": 320,
"cache_prefix_hash": "a3f2c1d4e5b6",
"suffix_input_count": 2,
"suffix_output_count": 1
}
}
```

#### Prefix stability validator

Detects dynamic content (timestamps, request IDs, UUIDs) bleeding into cached prefixes — the most common cause of 0% cache hit rates:

```go
validator := cache.NewStabilityValidator(cache.DefaultStabilityConfig())

// Runtime check — call on every request
issues := validator.Check("agent/planner.go:84", chunks)
for _, issue := range issues {
log.Warnf("%s", issue) // "cache-prefix-unstable: stability=12% — likely dynamic interpolation: request id"
}

// Static pre-flight check
found := validator.ValidateText(systemPromptText)
// found = ["request id", "timestamp"] if dynamic patterns detected
```

#### Automatic cache_control placement

```go
Expand Down Expand Up @@ -820,6 +872,8 @@ Distill is evolving from a dedup utility into a context intelligence layer. Here
| **Session-aware cache boundary manager** | [#51](https://github.com/Siddhant-K-code/distill/issues/51) | Shipped | Auto-advances `cache_control` placement as sessions grow. Stable entries (present ≥ 2 turns unmodified) are included in the cached prefix; boundary retreats when content changes. |
| **Cache write cost accounting** | [#52](https://github.com/Siddhant-K-code/distill/issues/52) | Shipped | 9 new Prometheus metrics covering Anthropic prompt cache token usage, hit rate, write efficiency, and boundary position. Feed API response usage via `RecordCacheUsage`. |
| **Memory decay lifecycle events** | [#54](https://github.com/Siddhant-K-code/distill/issues/54) | Shipped | `DecayWorker` emits `EventCompressed` and `EventEvicted` on each transition. `RecallResult` includes a `CacheBoundaryHint` for high-relevance entries. |
| **Cache-aware dedup** | [#50](https://github.com/Siddhant-K-code/distill/issues/50) | Shipped | `preserve_cache_prefix` option freezes chunks before the last `cache_control` marker so dedup cannot reorder them. Prefix hash and token count reported in stats. |
| **Prefix stability validator** | [#48](https://github.com/Siddhant-K-code/distill/issues/48) | Shipped | `StabilityValidator` tracks prefix hashes across requests and detects dynamic content (timestamps, request IDs, UUIDs) bleeding into cached prefixes. |

### Code Intelligence

Expand Down
155 changes: 110 additions & 45 deletions cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"syscall"
"time"

distillcache "github.com/Siddhant-K-code/distill/pkg/cache"
"github.com/Siddhant-K-code/distill/pkg/contextlab"
"github.com/Siddhant-K-code/distill/pkg/embedding/openai"
"github.com/Siddhant-K-code/distill/pkg/metrics"
Expand Down Expand Up @@ -61,14 +62,27 @@ type DedupeRequest struct {
Threshold float64 `json:"threshold,omitempty"`
Lambda float64 `json:"lambda,omitempty"`
TargetK int `json:"target_k,omitempty"`
Options DedupeOptions `json:"options,omitempty"`
}

// DedupeOptions controls optional dedup behaviour.
type DedupeOptions struct {
// PreserveCachePrefix freezes chunks before the last cache_control marker
// so the dedup pipeline cannot reorder or remove them. This prevents
// Distill from silently invalidating Anthropic prompt cache prefixes.
PreserveCachePrefix bool `json:"preserve_cache_prefix,omitempty"`
}

// DedupeChunk represents a chunk in the request.
type DedupeChunk struct {
ID string `json:"id"`
Text string `json:"text"`
Embedding []float32 `json:"embedding,omitempty"`
Score float32 `json:"score,omitempty"`
ID string `json:"id"`
Text string `json:"text"`
Embedding []float32 `json:"embedding,omitempty"`
Score float32 `json:"score,omitempty"`
// CacheControl mirrors the Anthropic cache_control field. When non-empty,
// this chunk is treated as a cache boundary marker. Used with
// options.preserve_cache_prefix to freeze the prefix during dedup.
CacheControl string `json:"cache_control,omitempty"`
}

// DedupeResponse is the JSON response for /v1/dedupe.
Expand All @@ -92,6 +106,13 @@ type DedupeStats struct {
ClusterCount int `json:"cluster_count"`
ReductionPct int `json:"reduction_pct"`
LatencyMs int64 `json:"latency_ms"`

// Cache prefix fields — populated when options.preserve_cache_prefix=true.
CachePrefixFrozen bool `json:"cache_prefix_frozen,omitempty"`
CachePrefixTokens int `json:"cache_prefix_tokens,omitempty"`
CachePrefixHash string `json:"cache_prefix_hash,omitempty"`
SuffixInputCount int `json:"suffix_input_count,omitempty"`
SuffixOutputCount int `json:"suffix_output_count,omitempty"`
}

// APIServer holds the API server state.
Expand Down Expand Up @@ -346,32 +367,43 @@ func (s *APIServer) handleDedupe(w http.ResponseWriter, r *http.Request) {

start := time.Now()

// Convert to internal types
// Convert to internal types, preserving cache_control metadata.
chunks := make([]types.Chunk, len(req.Chunks))
needsEmbedding := false

for i, c := range req.Chunks {
chunks[i] = types.Chunk{
ID: c.ID,
Text: c.Text,
Embedding: c.Embedding,
Score: c.Score,
Metadata: make(map[string]interface{}),
}
if c.CacheControl != "" {
chunks[i].Metadata["cache_control"] = c.CacheControl
}
if len(c.Embedding) == 0 {
needsEmbedding = true
}
}

// Generate embeddings if needed
// Partition into frozen prefix + dedup-eligible suffix when requested.
var partition distillcache.PrefixPartition
dedupChunks := chunks
if req.Options.PreserveCachePrefix {
partition = distillcache.PartitionForCacheAwareDedup(chunks)
dedupChunks = partition.Suffix
}

// Generate embeddings if needed (only for the dedup-eligible suffix).
if needsEmbedding {
if s.embedder == nil {
http.Error(w, "Embeddings required but no embedding provider configured. Either provide embeddings in request or configure OPENAI_API_KEY.", http.StatusBadRequest)
return
}

_, embSpan := s.tracing.StartEmbedding(ctx, len(chunks))
texts := make([]string, len(chunks))
for i, c := range chunks {
_, embSpan := s.tracing.StartEmbedding(ctx, len(dedupChunks))
texts := make([]string, len(dedupChunks))
for i, c := range dedupChunks {
texts[i] = c.Text
}

Expand All @@ -384,8 +416,8 @@ func (s *APIServer) handleDedupe(w http.ResponseWriter, r *http.Request) {
}
embSpan.End()

for i := range chunks {
chunks[i].Embedding = embeddings[i]
for i := range dedupChunks {
dedupChunks[i].Embedding = embeddings[i]
}
}

Expand All @@ -403,13 +435,13 @@ func (s *APIServer) handleDedupe(w http.ResponseWriter, r *http.Request) {
targetK = 0 // Will be set to cluster count
}

// Cluster
_, clusterSpan := s.tracing.StartClustering(ctx, len(chunks), threshold)
// Cluster the dedup-eligible suffix only.
_, clusterSpan := s.tracing.StartClustering(ctx, len(dedupChunks), threshold)
clusterer := contextlab.NewClusterer(contextlab.ClusterConfig{
Threshold: threshold,
Linkage: "average",
})
clusterResult := clusterer.Cluster(chunks)
clusterResult := clusterer.Cluster(dedupChunks)
clusterSpan.End()

// Select representatives
Expand All @@ -432,14 +464,17 @@ func (s *APIServer) handleDedupe(w http.ResponseWriter, r *http.Request) {
mmrSpan.End()
}

// Prepend the frozen prefix to the deduped suffix.
finalChunks := append(partition.Prefix, representatives...)

latency := time.Since(start)

// Record result on root span
telemetry.RecordResult(rootSpan, len(req.Chunks), len(representatives), clusterResult.ClusterCount, latency)
telemetry.RecordResult(rootSpan, len(req.Chunks), len(finalChunks), clusterResult.ClusterCount, latency)

// Build response
outputChunks := make([]DedupeChunkResponse, len(representatives))
for i, c := range representatives {
outputChunks := make([]DedupeChunkResponse, len(finalChunks))
for i, c := range finalChunks {
outputChunks[i] = DedupeChunkResponse{
ID: c.ID,
Text: c.Text,
Expand All @@ -450,22 +485,31 @@ func (s *APIServer) handleDedupe(w http.ResponseWriter, r *http.Request) {

reductionPct := 0
if len(req.Chunks) > 0 {
reductionPct = int((1 - float64(len(representatives))/float64(len(req.Chunks))) * 100)
reductionPct = int((1 - float64(len(finalChunks))/float64(len(req.Chunks))) * 100)
}

stats := DedupeStats{
InputCount: len(req.Chunks),
OutputCount: len(finalChunks),
ClusterCount: clusterResult.ClusterCount,
ReductionPct: reductionPct,
LatencyMs: latency.Milliseconds(),
}
if req.Options.PreserveCachePrefix && partition.MarkerCount > 0 {
stats.CachePrefixFrozen = true
stats.CachePrefixTokens = partition.FrozenPrefixTokens
stats.CachePrefixHash = partition.PrefixHash
stats.SuffixInputCount = len(partition.Suffix)
stats.SuffixOutputCount = len(representatives)
}

resp := DedupeResponse{
Chunks: outputChunks,
Stats: DedupeStats{
InputCount: len(req.Chunks),
OutputCount: len(representatives),
ClusterCount: clusterResult.ClusterCount,
ReductionPct: reductionPct,
LatencyMs: latency.Milliseconds(),
},
Stats: stats,
}

// Record dedup-specific metrics
s.metrics.RecordDedup("/v1/dedupe", len(req.Chunks), len(representatives), clusterResult.ClusterCount)
s.metrics.RecordDedup("/v1/dedupe", len(req.Chunks), len(finalChunks), clusterResult.ClusterCount)

w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(resp)
Expand Down Expand Up @@ -514,23 +558,34 @@ func (s *APIServer) handleDedupeStream(w http.ResponseWriter, r *http.Request) {

start := time.Now()

// Convert to internal types
// Convert to internal types, preserving cache_control metadata.
chunks := make([]types.Chunk, len(req.Chunks))
needsEmbedding := false

for i, c := range req.Chunks {
chunks[i] = types.Chunk{
ID: c.ID,
Text: c.Text,
Embedding: c.Embedding,
Score: c.Score,
Metadata: make(map[string]interface{}),
}
if c.CacheControl != "" {
chunks[i].Metadata["cache_control"] = c.CacheControl
}
if len(c.Embedding) == 0 {
needsEmbedding = true
}
}

// Stage 1: Embedding
// Partition into frozen prefix + dedup-eligible suffix when requested.
var partition distillcache.PrefixPartition
dedupChunks := chunks
if req.Options.PreserveCachePrefix {
partition = distillcache.PartitionForCacheAwareDedup(chunks)
dedupChunks = partition.Suffix
}

// Stage 1: Embedding (suffix only).
if needsEmbedding {
if s.embedder == nil {
_ = sw.SendError(sse.StageEmbedding, "Embeddings required but no embedding provider configured. Either provide embeddings in request or configure OPENAI_API_KEY.")
Expand All @@ -539,9 +594,9 @@ func (s *APIServer) handleDedupeStream(w http.ResponseWriter, r *http.Request) {

_ = sw.SendProgress(sse.StageEmbedding, 0)

_, embSpan := s.tracing.StartEmbedding(ctx, len(chunks))
texts := make([]string, len(chunks))
for i, c := range chunks {
_, embSpan := s.tracing.StartEmbedding(ctx, len(dedupChunks))
texts := make([]string, len(dedupChunks))
for i, c := range dedupChunks {
texts[i] = c.Text
}

Expand All @@ -554,8 +609,8 @@ func (s *APIServer) handleDedupeStream(w http.ResponseWriter, r *http.Request) {
}
embSpan.End()

for i := range chunks {
chunks[i].Embedding = embeddings[i]
for i := range dedupChunks {
dedupChunks[i].Embedding = embeddings[i]
}

_ = sw.SendProgress(sse.StageEmbedding, 1.0)
Expand All @@ -575,20 +630,20 @@ func (s *APIServer) handleDedupeStream(w http.ResponseWriter, r *http.Request) {
targetK = 0
}

// Stage 2: Clustering
// Stage 2: Clustering (suffix only).
_ = sw.SendProgress(sse.StageClustering, 0)

_, clusterSpan := s.tracing.StartClustering(ctx, len(chunks), threshold)
_, clusterSpan := s.tracing.StartClustering(ctx, len(dedupChunks), threshold)
clusterer := contextlab.NewClusterer(contextlab.ClusterConfig{
Threshold: threshold,
Linkage: "average",
})
clusterResult := clusterer.Cluster(chunks)
clusterResult := clusterer.Cluster(dedupChunks)
clusterSpan.End()

_ = sw.SendProgressWithStats(sse.StageClustering, 1.0, map[string]interface{}{
"clusters_formed": clusterResult.ClusterCount,
"input_count": len(chunks),
"input_count": len(dedupChunks),
})

// Stage 3: Selection
Expand Down Expand Up @@ -623,13 +678,16 @@ func (s *APIServer) handleDedupeStream(w http.ResponseWriter, r *http.Request) {
})
}

// Prepend frozen prefix to deduped suffix.
finalChunks := append(partition.Prefix, representatives...)

latency := time.Since(start)

telemetry.RecordResult(rootSpan, len(req.Chunks), len(representatives), clusterResult.ClusterCount, latency)
telemetry.RecordResult(rootSpan, len(req.Chunks), len(finalChunks), clusterResult.ClusterCount, latency)

// Build response chunks
outputChunks := make([]DedupeChunkResponse, len(representatives))
for i, c := range representatives {
outputChunks := make([]DedupeChunkResponse, len(finalChunks))
for i, c := range finalChunks {
outputChunks[i] = DedupeChunkResponse{
ID: c.ID,
Text: c.Text,
Expand All @@ -640,18 +698,25 @@ func (s *APIServer) handleDedupeStream(w http.ResponseWriter, r *http.Request) {

reductionPct := 0
if len(req.Chunks) > 0 {
reductionPct = int((1 - float64(len(representatives))/float64(len(req.Chunks))) * 100)
reductionPct = int((1 - float64(len(finalChunks))/float64(len(req.Chunks))) * 100)
}

stats := DedupeStats{
InputCount: len(req.Chunks),
OutputCount: len(representatives),
OutputCount: len(finalChunks),
ClusterCount: clusterResult.ClusterCount,
ReductionPct: reductionPct,
LatencyMs: latency.Milliseconds(),
}
if req.Options.PreserveCachePrefix && partition.MarkerCount > 0 {
stats.CachePrefixFrozen = true
stats.CachePrefixTokens = partition.FrozenPrefixTokens
stats.CachePrefixHash = partition.PrefixHash
stats.SuffixInputCount = len(partition.Suffix)
stats.SuffixOutputCount = len(representatives)
}

s.metrics.RecordDedup("/v1/dedupe/stream", len(req.Chunks), len(representatives), clusterResult.ClusterCount)
s.metrics.RecordDedup("/v1/dedupe/stream", len(req.Chunks), len(finalChunks), clusterResult.ClusterCount)

// Send final complete event
_ = sw.SendComplete(outputChunks, stats)
Expand Down
Loading
Loading