Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,44 @@ scrape_configs:

Import the included dashboard from `grafana/dashboard.json` or use dashboard UID `distill-overview`.

### OpenTelemetry Tracing

Distill supports distributed tracing via OpenTelemetry. Each pipeline stage (embedding, clustering, selection, MMR) is instrumented as a separate span.

Enable via `distill.yaml`:

```yaml
telemetry:
tracing:
enabled: true
exporter: otlp # otlp, stdout, or none
endpoint: localhost:4317
sample_rate: 1.0
insecure: true
```

Or via environment variables:

```bash
export DISTILL_TELEMETRY_TRACING_ENABLED=true
export DISTILL_TELEMETRY_TRACING_ENDPOINT=localhost:4317
```

Spans emitted per request:

| Span | Attributes |
|------|------------|
| `distill.request` | endpoint |
| `distill.embedding` | chunk_count |
| `distill.clustering` | input_count, threshold |
| `distill.selection` | cluster_count |
| `distill.mmr` | input_count, lambda |
| `distill.retrieval` | top_k, backend |

Result attributes (`distill.result.*`) are added to the root span: input_count, output_count, cluster_count, latency_ms, reduction_ratio.

W3C Trace Context propagation is enabled by default for cross-service tracing.

## Pipeline Modules

### Compression (`pkg/compress`)
Expand Down
45 changes: 44 additions & 1 deletion cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/Siddhant-K-code/distill/pkg/contextlab"
"github.com/Siddhant-K-code/distill/pkg/embedding/openai"
"github.com/Siddhant-K-code/distill/pkg/metrics"
"github.com/Siddhant-K-code/distill/pkg/telemetry"
"github.com/Siddhant-K-code/distill/pkg/types"
"github.com/spf13/cobra"
"github.com/spf13/viper"
Expand Down Expand Up @@ -95,6 +96,7 @@ type APIServer struct {
validKeys map[string]bool
hasAuth bool
metrics *metrics.Metrics
tracing *telemetry.Provider
}

func runAPI(cmd *cobra.Command, args []string) error {
Expand Down Expand Up @@ -139,11 +141,35 @@ func runAPI(cmd *cobra.Command, args []string) error {

m := metrics.New()

// Initialize tracing
tracingCfg := telemetry.DefaultConfig()
tracingCfg.Enabled = viper.GetBool("telemetry.tracing.enabled")
if ep := viper.GetString("telemetry.tracing.endpoint"); ep != "" {
tracingCfg.Endpoint = ep
}
if exp := viper.GetString("telemetry.tracing.exporter"); exp != "" {
tracingCfg.Exporter = exp
}
if sr := viper.GetFloat64("telemetry.tracing.sample_rate"); sr > 0 {
tracingCfg.SampleRate = sr
}

tp, err := telemetry.Init(context.Background(), tracingCfg)
if err != nil {
return fmt.Errorf("failed to initialize tracing: %w", err)
}
defer func() {
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = tp.Shutdown(shutdownCtx)
}()

server := &APIServer{
embedder: embedder,
validKeys: validKeys,
hasAuth: len(validKeys) > 0,
metrics: m,
tracing: tp,
}

// Setup routes
Expand Down Expand Up @@ -266,6 +292,10 @@ func (s *APIServer) handleDedupe(w http.ResponseWriter, r *http.Request) {
return
}

// Start root tracing span
ctx, rootSpan := s.tracing.StartRequest(r.Context(), "/v1/dedupe")
defer rootSpan.End()

start := time.Now()

// Convert to internal types
Expand All @@ -291,16 +321,20 @@ func (s *APIServer) handleDedupe(w http.ResponseWriter, r *http.Request) {
return
}

_, embSpan := s.tracing.StartEmbedding(ctx, len(chunks))
texts := make([]string, len(chunks))
for i, c := range chunks {
texts[i] = c.Text
}

embeddings, err := s.embedder.EmbedBatch(r.Context(), texts)
embeddings, err := s.embedder.EmbedBatch(ctx, texts)
if err != nil {
telemetry.RecordError(embSpan, err)
embSpan.End()
http.Error(w, fmt.Sprintf("Failed to generate embeddings: %v", err), http.StatusInternalServerError)
return
}
embSpan.End()

for i := range chunks {
chunks[i].Embedding = embeddings[i]
Expand All @@ -322,30 +356,39 @@ func (s *APIServer) handleDedupe(w http.ResponseWriter, r *http.Request) {
}

// Cluster
_, clusterSpan := s.tracing.StartClustering(ctx, len(chunks), threshold)
clusterer := contextlab.NewClusterer(contextlab.ClusterConfig{
Threshold: threshold,
Linkage: "average",
})
clusterResult := clusterer.Cluster(chunks)
clusterSpan.End()

// Select representatives
_, selectSpan := s.tracing.StartSelection(ctx, clusterResult.ClusterCount)
selectorCfg := contextlab.DefaultSelectorConfig()
selectorCfg.Strategy = contextlab.SelectByScore
selector := contextlab.NewSelector(selectorCfg)
representatives := selector.Select(clusterResult)
selectSpan.End()

// Apply MMR if we have more representatives than target
if targetK > 0 && len(representatives) > targetK {
_, mmrSpan := s.tracing.StartMMR(ctx, len(representatives), lambda)
mmrCfg := contextlab.MMRConfig{
Lambda: lambda,
TargetK: targetK,
}
mmr := contextlab.NewMMR(mmrCfg)
representatives = mmr.Rerank(representatives)
mmrSpan.End()
}

latency := time.Since(start)

// Record result on root span
telemetry.RecordResult(rootSpan, len(req.Chunks), len(representatives), clusterResult.ClusterCount, latency)

// Build response
outputChunks := make([]DedupeChunkResponse, len(representatives))
for i, c := range representatives {
Expand Down
36 changes: 35 additions & 1 deletion cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/Siddhant-K-code/distill/pkg/embedding/openai"
"github.com/Siddhant-K-code/distill/pkg/metrics"
"github.com/Siddhant-K-code/distill/pkg/retriever"
"github.com/Siddhant-K-code/distill/pkg/telemetry"
pcretriever "github.com/Siddhant-K-code/distill/pkg/retriever/pinecone"
qdretriever "github.com/Siddhant-K-code/distill/pkg/retriever/qdrant"
"github.com/Siddhant-K-code/distill/pkg/types"
Expand Down Expand Up @@ -81,6 +82,7 @@ type Server struct {
broker *contextlab.Broker
cfg ServerConfig
metrics *metrics.Metrics
tracing *telemetry.Provider
}

// ServerConfig holds server configuration.
Expand Down Expand Up @@ -236,6 +238,29 @@ func runServe(cmd *cobra.Command, args []string) error {

m := metrics.New()

// Initialize tracing
tracingCfg := telemetry.DefaultConfig()
tracingCfg.Enabled = viper.GetBool("telemetry.tracing.enabled")
if ep := viper.GetString("telemetry.tracing.endpoint"); ep != "" {
tracingCfg.Endpoint = ep
}
if exp := viper.GetString("telemetry.tracing.exporter"); exp != "" {
tracingCfg.Exporter = exp
}
if sr := viper.GetFloat64("telemetry.tracing.sample_rate"); sr > 0 {
tracingCfg.SampleRate = sr
}

tp, err := telemetry.Init(context.Background(), tracingCfg)
if err != nil {
return fmt.Errorf("failed to initialize tracing: %w", err)
}
defer func() {
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = tp.Shutdown(shutdownCtx)
}()

// Create server
server := &Server{
broker: broker,
Expand All @@ -244,6 +269,7 @@ func runServe(cmd *cobra.Command, args []string) error {
Port: port,
},
metrics: m,
tracing: tp,
}

// Setup routes
Expand Down Expand Up @@ -346,13 +372,21 @@ func (s *Server) handleRetrieve(w http.ResponseWriter, r *http.Request) {
s.broker.SetConfig(cfg)
}

// Start tracing span
ctx, rootSpan := s.tracing.StartRequest(r.Context(), "/v1/retrieve")
defer rootSpan.End()

// Execute retrieval
result, err := s.broker.Retrieve(r.Context(), retrievalReq)
result, err := s.broker.Retrieve(ctx, retrievalReq)
if err != nil {
telemetry.RecordError(rootSpan, err)
http.Error(w, fmt.Sprintf("Retrieval failed: %v", err), http.StatusInternalServerError)
return
}

// Record result on root span
telemetry.RecordResult(rootSpan, result.Stats.Retrieved, result.Stats.Returned, result.Stats.Clustered, result.Stats.TotalLatency)

// Build response
chunks := make([]ChunkResponse, len(result.Chunks))
for i, c := range result.Chunks {
Expand Down
31 changes: 22 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,32 @@ require (
github.com/pinecone-io/go-pinecone/v3 v3.1.0
github.com/prometheus/client_golang v1.23.2
github.com/prometheus/client_model v0.6.2
github.com/qdrant/go-client v1.16.2
github.com/qdrant/go-client v1.15.2
github.com/schollz/progressbar/v3 v3.14.6
github.com/spf13/cobra v1.8.1
github.com/spf13/viper v1.19.0
google.golang.org/grpc v1.76.0
google.golang.org/protobuf v1.36.10
go.opentelemetry.io/otel v1.40.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.40.0
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.40.0
go.opentelemetry.io/otel/sdk v1.40.0
go.opentelemetry.io/otel/trace v1.40.0
google.golang.org/grpc v1.78.0
google.golang.org/protobuf v1.36.11
)

require (
github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect
github.com/bahlo/generic-list-go v0.2.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/buger/jsonparser v1.1.1 // indirect
github.com/cenkalti/backoff/v5 v5.0.3 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/invopop/jsonschema v0.13.0 // indirect
Expand All @@ -50,16 +59,20 @@ require (
github.com/subosito/gotenv v1.6.0 // indirect
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
github.com/yosida95/uritemplate/v3 v3.0.2 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0 // indirect
go.opentelemetry.io/otel/metric v1.40.0 // indirect
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
go.yaml.in/yaml/v2 v2.4.2 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/net v0.47.0 // indirect
golang.org/x/sys v0.38.0 // indirect
golang.org/x/term v0.37.0 // indirect
golang.org/x/text v0.31.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250804133106-a7a43d27e69b // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20251111163417-95abcf5c77ba // indirect
golang.org/x/net v0.49.0 // indirect
golang.org/x/sys v0.40.0 // indirect
golang.org/x/term v0.39.0 // indirect
golang.org/x/text v0.33.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading