diff --git a/README.md b/README.md index 928b680..ec7c13f 100644 --- a/README.md +++ b/README.md @@ -375,6 +375,34 @@ Or manually: Connect your repo and set `OPENAI_API_KEY` in environment variables. +## Monitoring + +Distill exposes a Prometheus-compatible `/metrics` endpoint on both `api` and `serve` commands. + +### Metrics + +| Metric | Type | Description | +|--------|------|-------------| +| `distill_requests_total` | Counter | Total requests by endpoint and status code | +| `distill_request_duration_seconds` | Histogram | Request latency distribution | +| `distill_chunks_processed_total` | Counter | Chunks processed (input/output) | +| `distill_reduction_ratio` | Histogram | Chunk reduction ratio per request | +| `distill_active_requests` | Gauge | Currently processing requests | +| `distill_clusters_formed_total` | Counter | Clusters formed during deduplication | + +### Prometheus Scrape Config + +```yaml +scrape_configs: + - job_name: distill + static_configs: + - targets: ['localhost:8080'] +``` + +### Grafana Dashboard + +Import the included dashboard from `grafana/dashboard.json` or use dashboard UID `distill-overview`. + ## Architecture ``` diff --git a/cmd/api.go b/cmd/api.go index 53d6eb6..e593427 100644 --- a/cmd/api.go +++ b/cmd/api.go @@ -13,6 +13,7 @@ import ( "github.com/Siddhant-K-code/distill/pkg/contextlab" "github.com/Siddhant-K-code/distill/pkg/embedding/openai" + "github.com/Siddhant-K-code/distill/pkg/metrics" "github.com/Siddhant-K-code/distill/pkg/types" "github.com/spf13/cobra" "github.com/spf13/viper" @@ -93,6 +94,7 @@ type APIServer struct { embedder *openai.Client validKeys map[string]bool hasAuth bool + metrics *metrics.Metrics } func runAPI(cmd *cobra.Command, args []string) error { @@ -135,16 +137,22 @@ func runAPI(cmd *cobra.Command, args []string) error { } } + m := metrics.New() + server := &APIServer{ embedder: embedder, validKeys: validKeys, hasAuth: len(validKeys) > 0, + metrics: m, } // Setup routes mux := http.NewServeMux() - mux.HandleFunc("/v1/dedupe", server.handleDedupe) + mux.HandleFunc("/v1/dedupe", m.Middleware("/v1/dedupe", server.handleDedupe)) mux.HandleFunc("/health", server.handleHealth) + mux.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) { + m.Handler().ServeHTTP(w, r) + }) mux.HandleFunc("/", server.handleRoot) // CORS middleware @@ -186,6 +194,7 @@ func runAPI(cmd *cobra.Command, args []string) error { fmt.Println("Endpoints:") fmt.Printf(" POST http://%s/v1/dedupe\n", addr) fmt.Printf(" GET http://%s/health\n", addr) + fmt.Printf(" GET http://%s/metrics\n", addr) fmt.Println() if err := httpServer.ListenAndServe(); err != http.ErrServerClosed { @@ -219,8 +228,9 @@ func (s *APIServer) handleRoot(w http.ResponseWriter, r *http.Request) { "version": "1.0.0", "docs": "https://distill.siddhantkhare.com/docs", "endpoints": map[string]string{ - "dedupe": "POST /v1/dedupe", - "health": "GET /health", + "dedupe": "POST /v1/dedupe", + "health": "GET /health", + "metrics": "GET /metrics", }, }) } @@ -363,6 +373,9 @@ func (s *APIServer) handleDedupe(w http.ResponseWriter, r *http.Request) { }, } + // Record dedup-specific metrics + s.metrics.RecordDedup("/v1/dedupe", len(req.Chunks), len(representatives), clusterResult.ClusterCount) + w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(resp) } diff --git a/cmd/serve.go b/cmd/serve.go index 96ee643..2a40850 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -12,6 +12,7 @@ import ( "github.com/Siddhant-K-code/distill/pkg/contextlab" "github.com/Siddhant-K-code/distill/pkg/embedding/openai" + "github.com/Siddhant-K-code/distill/pkg/metrics" "github.com/Siddhant-K-code/distill/pkg/retriever" pcretriever "github.com/Siddhant-K-code/distill/pkg/retriever/pinecone" qdretriever "github.com/Siddhant-K-code/distill/pkg/retriever/qdrant" @@ -77,8 +78,9 @@ func init() { // Server holds the HTTP server state. type Server struct { - broker *contextlab.Broker - cfg ServerConfig + broker *contextlab.Broker + cfg ServerConfig + metrics *metrics.Metrics } // ServerConfig holds server configuration. @@ -232,6 +234,8 @@ func runServe(cmd *cobra.Command, args []string) error { } defer func() { _ = broker.Close() }() + m := metrics.New() + // Create server server := &Server{ broker: broker, @@ -239,13 +243,16 @@ func runServe(cmd *cobra.Command, args []string) error { Host: host, Port: port, }, + metrics: m, } // Setup routes mux := http.NewServeMux() - mux.HandleFunc("/v1/retrieve", server.handleRetrieve) + mux.HandleFunc("/v1/retrieve", m.Middleware("/v1/retrieve", server.handleRetrieve)) mux.HandleFunc("/health", server.handleHealth) - mux.HandleFunc("/metrics", server.handleMetrics) + mux.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) { + m.Handler().ServeHTTP(w, r) + }) // Create HTTP server addr := fmt.Sprintf("%s:%d", host, port) @@ -370,6 +377,9 @@ func (s *Server) handleRetrieve(w http.ResponseWriter, r *http.Request) { }, } + // Record dedup-specific metrics + s.metrics.RecordDedup("/v1/retrieve", result.Stats.Retrieved, result.Stats.Returned, result.Stats.Clustered) + w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(resp) } @@ -378,17 +388,3 @@ func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(map[string]string{"status": "ok"}) } - -func (s *Server) handleMetrics(w http.ResponseWriter, r *http.Request) { - cfg := s.broker.GetConfig() - w.Header().Set("Content-Type", "application/json") - _ = json.NewEncoder(w).Encode(map[string]interface{}{ - "config": map[string]interface{}{ - "over_fetch_k": cfg.OverFetchK, - "target_k": cfg.TargetK, - "threshold": cfg.ClusterThreshold, - "lambda": cfg.MMRLambda, - "mmr_enabled": cfg.EnableMMR, - }, - }) -} diff --git a/go.mod b/go.mod index 38ab0e8..04d7563 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,8 @@ toolchain go1.24.11 require ( github.com/mark3labs/mcp-go v0.43.2 github.com/pinecone-io/go-pinecone/v3 v3.1.0 + github.com/prometheus/client_golang v1.23.2 + github.com/prometheus/client_model v0.6.2 github.com/qdrant/go-client v1.16.2 github.com/schollz/progressbar/v3 v3.14.6 github.com/spf13/cobra v1.8.1 @@ -18,7 +20,9 @@ require ( require ( github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect github.com/bahlo/generic-list-go v0.2.0 // indirect + github.com/beorn7/perks v1.0.1 // indirect github.com/buger/jsonparser v1.1.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/google/uuid v1.6.0 // indirect @@ -29,9 +33,12 @@ require ( github.com/mailru/easyjson v0.7.7 // indirect github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/oapi-codegen/runtime v1.1.1 // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/prometheus/common v0.66.1 // indirect + github.com/prometheus/procfs v0.16.1 // indirect github.com/rivo/uniseg v0.4.7 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect @@ -45,6 +52,7 @@ require ( github.com/yosida95/uritemplate/v3 v3.0.2 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.9.0 // indirect + go.yaml.in/yaml/v2 v2.4.2 // indirect golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect golang.org/x/net v0.47.0 // indirect golang.org/x/sys v0.38.0 // indirect diff --git a/go.sum b/go.sum index d9d2e6d..4590d02 100644 --- a/go.sum +++ b/go.sum @@ -3,9 +3,13 @@ github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7D github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk= github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk= github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w= github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs= github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -34,10 +38,14 @@ github.com/invopop/jsonschema v0.13.0/go.mod h1:ffZ5Km5SWWRAIN6wbDXItl95euhFz2uO github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/juju/gnuflag v0.0.0-20171113085948-2ce1bb71843d/go.mod h1:2PavIy+JPciBPrBUjwbNvtwB6RQlve+hkpll6QSNmOE= github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213/go.mod h1:vNUNkEQ1e29fT/6vq2aBdFsgNPmy8qMdSay1npru+Sw= +github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co= +github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/magiconair/properties v1.8.10 h1:s31yESBquKXCV9a/ScB3ESkOjUYYv+X0rg8SYxI99mE= github.com/magiconair/properties v1.8.10/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= @@ -49,6 +57,8 @@ github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db h1:62I3jR2Em github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db/go.mod h1:l0dey0ia/Uv7NcFFVbCLtqEBQbrT4OCwCSKTEv6enCw= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/oapi-codegen/runtime v1.1.1 h1:EXLHh0DXIJnWhdRPN2w4MXAzFyE4CskzhNLUmtpMYro= github.com/oapi-codegen/runtime v1.1.1/go.mod h1:SK9X900oXmPWilYR5/WKPzt3Kqxn/uS/+lbpREv+eCg= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= @@ -58,12 +68,20 @@ github.com/pinecone-io/go-pinecone/v3 v3.1.0/go.mod h1:v8VJwwmZFesCP3bIYv98eU/kI github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o= +github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg= +github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= +github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= +github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9ZoGs= +github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA= +github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= +github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= github.com/qdrant/go-client v1.16.2 h1:UUMJJfvXTByhwhH1DwWdbkhZ2cTdvSqVkXSIfBrVWSg= github.com/qdrant/go-client v1.16.2/go.mod h1:I+EL3h4HRoRTeHtbfOd/4kDXwCukZfkd41j/9wryGkw= github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= -github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= -github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6keLGt6kNQ= github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4= @@ -115,8 +133,12 @@ go.opentelemetry.io/otel/trace v1.38.0 h1:Fxk5bKrDZJUH+AMyyIXGcFAPah0oRcT+LuNtJr go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42sO++GHkg4wwhs= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= +go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI= +go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g= golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k= golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY= @@ -141,8 +163,8 @@ google.golang.org/grpc v1.76.0/go.mod h1:Ju12QI8M6iQJtbcsV+awF5a4hfJMLi4X0JLo94U google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE= google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/grafana/dashboard.json b/grafana/dashboard.json new file mode 100644 index 0000000..0ae5119 --- /dev/null +++ b/grafana/dashboard.json @@ -0,0 +1,203 @@ +{ + "annotations": { + "list": [] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 1, + "links": [], + "panels": [ + { + "gridPos": { "h": 4, "w": 6, "x": 0, "y": 0 }, + "id": 1, + "title": "Request Rate", + "type": "stat", + "targets": [ + { + "expr": "sum(rate(distill_requests_total[5m]))", + "legendFormat": "req/s" + } + ], + "fieldConfig": { + "defaults": { + "unit": "reqps" + } + } + }, + { + "gridPos": { "h": 4, "w": 6, "x": 6, "y": 0 }, + "id": 2, + "title": "Error Rate", + "type": "stat", + "targets": [ + { + "expr": "sum(rate(distill_requests_total{status=~\"4..|5..\"}[5m])) / sum(rate(distill_requests_total[5m])) * 100", + "legendFormat": "errors %" + } + ], + "fieldConfig": { + "defaults": { + "unit": "percent", + "thresholds": { + "steps": [ + { "color": "green", "value": null }, + { "color": "yellow", "value": 1 }, + { "color": "red", "value": 5 } + ] + } + } + } + }, + { + "gridPos": { "h": 4, "w": 6, "x": 12, "y": 0 }, + "id": 3, + "title": "P99 Latency", + "type": "stat", + "targets": [ + { + "expr": "histogram_quantile(0.99, sum(rate(distill_request_duration_seconds_bucket[5m])) by (le))", + "legendFormat": "p99" + } + ], + "fieldConfig": { + "defaults": { + "unit": "s" + } + } + }, + { + "gridPos": { "h": 4, "w": 6, "x": 18, "y": 0 }, + "id": 4, + "title": "Active Requests", + "type": "stat", + "targets": [ + { + "expr": "distill_active_requests", + "legendFormat": "active" + } + ] + }, + { + "gridPos": { "h": 8, "w": 12, "x": 0, "y": 4 }, + "id": 5, + "title": "Request Rate by Endpoint", + "type": "timeseries", + "targets": [ + { + "expr": "sum(rate(distill_requests_total[5m])) by (endpoint)", + "legendFormat": "{{endpoint}}" + } + ], + "fieldConfig": { + "defaults": { + "unit": "reqps" + } + } + }, + { + "gridPos": { "h": 8, "w": 12, "x": 12, "y": 4 }, + "id": 6, + "title": "Request Latency (p50, p95, p99)", + "type": "timeseries", + "targets": [ + { + "expr": "histogram_quantile(0.50, sum(rate(distill_request_duration_seconds_bucket[5m])) by (le, endpoint))", + "legendFormat": "p50 {{endpoint}}" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(distill_request_duration_seconds_bucket[5m])) by (le, endpoint))", + "legendFormat": "p95 {{endpoint}}" + }, + { + "expr": "histogram_quantile(0.99, sum(rate(distill_request_duration_seconds_bucket[5m])) by (le, endpoint))", + "legendFormat": "p99 {{endpoint}}" + } + ], + "fieldConfig": { + "defaults": { + "unit": "s" + } + } + }, + { + "gridPos": { "h": 8, "w": 12, "x": 0, "y": 12 }, + "id": 7, + "title": "Chunks Processed", + "type": "timeseries", + "targets": [ + { + "expr": "sum(rate(distill_chunks_processed_total[5m])) by (direction)", + "legendFormat": "{{direction}}" + } + ], + "fieldConfig": { + "defaults": { + "unit": "cps" + } + } + }, + { + "gridPos": { "h": 8, "w": 12, "x": 12, "y": 12 }, + "id": 8, + "title": "Reduction Ratio Distribution", + "type": "timeseries", + "targets": [ + { + "expr": "histogram_quantile(0.50, sum(rate(distill_reduction_ratio_bucket[5m])) by (le))", + "legendFormat": "p50" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(distill_reduction_ratio_bucket[5m])) by (le))", + "legendFormat": "p95" + } + ], + "fieldConfig": { + "defaults": { + "unit": "percentunit", + "min": 0, + "max": 1 + } + } + }, + { + "gridPos": { "h": 8, "w": 12, "x": 0, "y": 20 }, + "id": 9, + "title": "Clusters Formed Rate", + "type": "timeseries", + "targets": [ + { + "expr": "sum(rate(distill_clusters_formed_total[5m])) by (endpoint)", + "legendFormat": "{{endpoint}}" + } + ] + }, + { + "gridPos": { "h": 8, "w": 12, "x": 12, "y": 20 }, + "id": 10, + "title": "Requests by Status Code", + "type": "timeseries", + "targets": [ + { + "expr": "sum(rate(distill_requests_total[5m])) by (status)", + "legendFormat": "{{status}}" + } + ], + "fieldConfig": { + "defaults": { + "unit": "reqps" + } + } + } + ], + "schemaVersion": 39, + "tags": ["distill"], + "templating": { + "list": [] + }, + "time": { + "from": "now-1h", + "to": "now" + }, + "title": "Distill", + "uid": "distill-overview" +} diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go new file mode 100644 index 0000000..3bde0ab --- /dev/null +++ b/pkg/metrics/metrics.go @@ -0,0 +1,141 @@ +// Package metrics provides Prometheus instrumentation for Distill. +package metrics + +import ( + "net/http" + "strconv" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +// Metrics holds all Prometheus metric collectors for Distill. +type Metrics struct { + RequestsTotal *prometheus.CounterVec + RequestDuration *prometheus.HistogramVec + ChunksProcessed *prometheus.CounterVec + ReductionRatio *prometheus.HistogramVec + ActiveRequests prometheus.Gauge + ClustersFormed *prometheus.CounterVec + + registry *prometheus.Registry +} + +// New creates and registers all Distill metrics. +func New() *Metrics { + reg := prometheus.NewRegistry() + + // Include default Go and process collectors + reg.MustRegister(collectors.NewGoCollector()) + reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) + + m := &Metrics{ + RequestsTotal: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "distill_requests_total", + Help: "Total HTTP requests by endpoint and status code.", + }, + []string{"endpoint", "status"}, + ), + RequestDuration: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "distill_request_duration_seconds", + Help: "HTTP request latency distribution.", + Buckets: []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5}, + }, + []string{"endpoint"}, + ), + ChunksProcessed: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "distill_chunks_processed_total", + Help: "Total chunks processed by direction (input/output).", + }, + []string{"direction"}, + ), + ReductionRatio: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "distill_reduction_ratio", + Help: "Chunk reduction ratio per request (0=no reduction, 1=all removed).", + Buckets: []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0}, + }, + []string{"endpoint"}, + ), + ActiveRequests: prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "distill_active_requests", + Help: "Number of requests currently being processed.", + }, + ), + ClustersFormed: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "distill_clusters_formed_total", + Help: "Total clusters formed during deduplication.", + }, + []string{"endpoint"}, + ), + registry: reg, + } + + reg.MustRegister( + m.RequestsTotal, + m.RequestDuration, + m.ChunksProcessed, + m.ReductionRatio, + m.ActiveRequests, + m.ClustersFormed, + ) + + return m +} + +// Handler returns an http.Handler that serves the /metrics endpoint. +func (m *Metrics) Handler() http.Handler { + return promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{}) +} + +// RecordRequest records a completed request's metrics. +func (m *Metrics) RecordRequest(endpoint string, statusCode int, duration time.Duration) { + status := strconv.Itoa(statusCode) + m.RequestsTotal.WithLabelValues(endpoint, status).Inc() + m.RequestDuration.WithLabelValues(endpoint).Observe(duration.Seconds()) +} + +// RecordDedup records deduplication-specific metrics. +func (m *Metrics) RecordDedup(endpoint string, inputCount, outputCount, clusterCount int) { + m.ChunksProcessed.WithLabelValues("input").Add(float64(inputCount)) + m.ChunksProcessed.WithLabelValues("output").Add(float64(outputCount)) + m.ClustersFormed.WithLabelValues(endpoint).Add(float64(clusterCount)) + + if inputCount > 0 { + ratio := 1.0 - float64(outputCount)/float64(inputCount) + m.ReductionRatio.WithLabelValues(endpoint).Observe(ratio) + } +} + +// Middleware returns an HTTP middleware that instruments requests. +func (m *Metrics) Middleware(endpoint string, next http.HandlerFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + m.ActiveRequests.Inc() + defer m.ActiveRequests.Dec() + + rw := &responseWriter{ResponseWriter: w, statusCode: http.StatusOK} + start := time.Now() + + next.ServeHTTP(rw, r) + + m.RecordRequest(endpoint, rw.statusCode, time.Since(start)) + } +} + +// responseWriter wraps http.ResponseWriter to capture the status code. +type responseWriter struct { + http.ResponseWriter + statusCode int +} + +func (rw *responseWriter) WriteHeader(code int) { + rw.statusCode = code + rw.ResponseWriter.WriteHeader(code) +} diff --git a/pkg/metrics/metrics_test.go b/pkg/metrics/metrics_test.go new file mode 100644 index 0000000..62b3722 --- /dev/null +++ b/pkg/metrics/metrics_test.go @@ -0,0 +1,178 @@ +package metrics + +import ( + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" +) + +func TestNew(t *testing.T) { + m := New() + if m == nil { + t.Fatal("New() returned nil") + } + if m.registry == nil { + t.Fatal("registry is nil") + } +} + +func TestRecordRequest(t *testing.T) { + m := New() + m.RecordRequest("/v1/dedupe", 200, 50*time.Millisecond) + m.RecordRequest("/v1/dedupe", 200, 100*time.Millisecond) + m.RecordRequest("/v1/dedupe", 400, 5*time.Millisecond) + + // Check counter + val := counterValue(t, m.RequestsTotal, "endpoint", "/v1/dedupe", "status", "200") + if val != 2 { + t.Errorf("expected 2 requests with status 200, got %f", val) + } + + val = counterValue(t, m.RequestsTotal, "endpoint", "/v1/dedupe", "status", "400") + if val != 1 { + t.Errorf("expected 1 request with status 400, got %f", val) + } +} + +func TestRecordDedup(t *testing.T) { + m := New() + m.RecordDedup("/v1/dedupe", 10, 6, 6) + + inputVal := counterValue(t, m.ChunksProcessed, "direction", "input") + if inputVal != 10 { + t.Errorf("expected 10 input chunks, got %f", inputVal) + } + + outputVal := counterValue(t, m.ChunksProcessed, "direction", "output") + if outputVal != 6 { + t.Errorf("expected 6 output chunks, got %f", outputVal) + } + + clusterVal := counterValue(t, m.ClustersFormed, "endpoint", "/v1/dedupe") + if clusterVal != 6 { + t.Errorf("expected 6 clusters, got %f", clusterVal) + } +} + +func TestRecordDedup_ZeroInput(t *testing.T) { + m := New() + // Should not panic on zero input + m.RecordDedup("/v1/dedupe", 0, 0, 0) +} + +func TestMiddleware(t *testing.T) { + m := New() + + handler := m.Middleware("/v1/dedupe", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("ok")) + }) + + req := httptest.NewRequest(http.MethodPost, "/v1/dedupe", nil) + rec := httptest.NewRecorder() + handler.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Errorf("expected status 200, got %d", rec.Code) + } + + val := counterValue(t, m.RequestsTotal, "endpoint", "/v1/dedupe", "status", "200") + if val != 1 { + t.Errorf("expected 1 request recorded, got %f", val) + } +} + +func TestMiddleware_ErrorStatus(t *testing.T) { + m := New() + + handler := m.Middleware("/v1/dedupe", func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "bad request", http.StatusBadRequest) + }) + + req := httptest.NewRequest(http.MethodPost, "/v1/dedupe", nil) + rec := httptest.NewRecorder() + handler.ServeHTTP(rec, req) + + val := counterValue(t, m.RequestsTotal, "endpoint", "/v1/dedupe", "status", "400") + if val != 1 { + t.Errorf("expected 1 request with status 400, got %f", val) + } +} + +func TestHandler(t *testing.T) { + m := New() + m.RecordRequest("/v1/dedupe", 200, 10*time.Millisecond) + + req := httptest.NewRequest(http.MethodGet, "/metrics", nil) + rec := httptest.NewRecorder() + m.Handler().ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Errorf("expected status 200, got %d", rec.Code) + } + + body := rec.Body.String() + if !strings.Contains(body, "distill_requests_total") { + t.Error("metrics output missing distill_requests_total") + } + if !strings.Contains(body, "distill_request_duration_seconds") { + t.Error("metrics output missing distill_request_duration_seconds") + } + if !strings.Contains(body, "go_goroutines") { + t.Error("metrics output missing go runtime metrics") + } +} + +func TestActiveRequests(t *testing.T) { + m := New() + + started := make(chan struct{}) + release := make(chan struct{}) + + handler := m.Middleware("/v1/dedupe", func(w http.ResponseWriter, r *http.Request) { + close(started) + <-release + w.WriteHeader(http.StatusOK) + }) + + go func() { + req := httptest.NewRequest(http.MethodPost, "/v1/dedupe", nil) + rec := httptest.NewRecorder() + handler.ServeHTTP(rec, req) + }() + + <-started + + var metric dto.Metric + if err := m.ActiveRequests.Write(&metric); err != nil { + t.Fatalf("failed to read gauge: %v", err) + } + if metric.GetGauge().GetValue() != 1 { + t.Errorf("expected 1 active request, got %f", metric.GetGauge().GetValue()) + } + + close(release) +} + +// counterValue extracts the value of a counter with the given label pairs. +func counterValue(t *testing.T, cv *prometheus.CounterVec, labelPairs ...string) float64 { + t.Helper() + labels := prometheus.Labels{} + for i := 0; i < len(labelPairs); i += 2 { + labels[labelPairs[i]] = labelPairs[i+1] + } + counter, err := cv.GetMetricWith(labels) + if err != nil { + t.Fatalf("failed to get metric: %v", err) + } + var metric dto.Metric + if err := counter.Write(&metric); err != nil { + t.Fatalf("failed to write metric: %v", err) + } + return metric.GetCounter().GetValue() +}