Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions docs/observability.md
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,48 @@ Number of currently active MCP connections.
| `transport` | string | Backend transport type |
| `connection_type` | string | `"sse"` (only present for SSE connections) |

### Rate Limit Metrics

These metrics are emitted for Redis-backed rate limit checks used by MCPServer
and VirtualMCPServer. Prometheus appends `_total` to counter names. The latency
histogram is exported with the `_seconds` unit suffix and the standard
`_bucket`, `_sum`, and `_count` series suffixes.

#### `toolhive_rate_limit_decisions` (Counter)

Total number of rate limit bucket decisions. An allowed request increments once
for every applicable bucket. A rejected request increments only for the first
bucket rejected by the atomic Redis check. Requests with no applicable bucket
do not increment this counter.

| Attribute | Type | Description |
|-----------|------|-------------|
| `namespace` | string | Kubernetes namespace associated with the server |
| `server` | string | MCPServer or VirtualMCPServer name |
| `decision` | string | `"allowed"` or `"rejected"` |
| `scope` | string | `"shared"` or `"per_user"` |
| `operation_type` | string | `"server"` or `"tool"` |

#### `toolhive_rate_limit_redis_errors` (Counter)

Total number of Redis errors encountered while checking rate limits.

| Attribute | Type | Description |
|-----------|------|-------------|
| `namespace` | string | Kubernetes namespace associated with the server |
| `server` | string | MCPServer or VirtualMCPServer name |
| `error_type` | string | `"timeout"`, `"connection"`, `"auth"`, or `"other"` |

#### `toolhive_rate_limit_check_latency` (Histogram, seconds)

Duration of each attempted atomic Redis Lua rate limit check, including failed
checks.

| Attribute | Type | Description |
|-----------|------|-------------|
| `namespace` | string | Kubernetes namespace associated with the server |
| `server` | string | MCPServer or VirtualMCPServer name |

## Span Attributes

### HTTP Attributes
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ require (
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c
github.com/pressly/goose/v3 v3.27.1
github.com/prometheus/client_golang v1.23.2
github.com/prometheus/client_model v0.6.2
github.com/prometheus/common v0.67.5
github.com/redis/go-redis/v9 v9.21.0
github.com/shirou/gopsutil/v4 v4.26.5
github.com/spf13/viper v1.21.0
Expand Down Expand Up @@ -233,8 +235,6 @@ require (
github.com/ory/x v0.0.665 // indirect
github.com/pjbgf/sha1cd v0.6.0 // indirect
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.67.5 // indirect
github.com/prometheus/otlptranslator v1.0.0 // indirect
github.com/prometheus/procfs v0.20.1 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
Expand Down
84 changes: 69 additions & 15 deletions pkg/ratelimit/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"time"

"github.com/redis/go-redis/v9"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"

v1beta1 "github.com/stacklok/toolhive/cmd/thv-operator/api/v1beta1"
"github.com/stacklok/toolhive/pkg/auth"
Expand Down Expand Up @@ -65,11 +67,29 @@ func Allow(ctx context.Context, limiter Limiter, identity *auth.Identity, toolNa
// Returns a no-op limiter (always allows) when crd is nil.
// namespace and name identify the MCP server for Redis key derivation.
func NewLimiter(client redis.Cmdable, namespace, name string, crd *v1beta1.RateLimitConfig) (Limiter, error) {
return newLimiter(client, namespace, name, crd, otel.GetMeterProvider())
}

func newLimiter(
client redis.Cmdable,
namespace string,
name string,
crd *v1beta1.RateLimitConfig,
meterProvider metric.MeterProvider,
) (Limiter, error) {
if crd == nil {
return noopLimiter{}, nil
}

l := &limiter{client: client}
telemetry, err := newRateLimitTelemetry(meterProvider, namespace, name)
if err != nil {
return nil, fmt.Errorf("rate limit telemetry: %w", err)
}

l := &limiter{
client: client,
telemetry: telemetry,
}

if crd.Shared != nil {
b, err := newBucket(namespace, name, "shared", crd.Shared)
Expand Down Expand Up @@ -122,9 +142,17 @@ type bucketSpec struct {
refillPeriod time.Duration
}

// limitCheck keeps a bucket paired with its metric dimensions.
type limitCheck struct {
bucket *bucket.TokenBucket
scope string
operationType string
}

// limiter is the concrete implementation of Limiter.
type limiter struct {
client redis.Cmdable
telemetry *rateLimitTelemetry
serverBucket *bucket.TokenBucket // nil when no shared server limit
toolBuckets map[string]*bucket.TokenBucket // tool name -> shared bucket
perUserSpec *bucketSpec // nil when no server-level per-user limit
Expand All @@ -136,13 +164,21 @@ type limiter struct {
// a rejected per-tool or per-user call from draining other budgets.
func (l *limiter) Allow(ctx context.Context, toolName, userID string) (*Decision, error) {
// Collect applicable buckets in priority order.
var buckets []*bucket.TokenBucket
var checks []limitCheck
if l.serverBucket != nil {
buckets = append(buckets, l.serverBucket)
checks = append(checks, limitCheck{
bucket: l.serverBucket,
scope: rateLimitScopeShared,
operationType: rateLimitOperationServer,
})
}
if toolName != "" && l.toolBuckets != nil {
if tb, ok := l.toolBuckets[toolName]; ok {
buckets = append(buckets, tb)
checks = append(checks, limitCheck{
bucket: tb,
scope: rateLimitScopeShared,
operationType: rateLimitOperationTool,
})
}
}

Expand All @@ -157,40 +193,58 @@ func (l *limiter) Allow(ctx context.Context, toolName, userID string) (*Decision
if userID != "" {
if l.perUserSpec != nil {
s := l.perUserSpec
buckets = append(buckets, bucket.New(
s.namespace, s.serverName,
"user:"+userID,
s.maxTokens, s.refillPeriod,
))
checks = append(checks, limitCheck{
bucket: bucket.New(
s.namespace, s.serverName,
"user:"+userID,
s.maxTokens, s.refillPeriod,
),
scope: rateLimitScopePerUser,
operationType: rateLimitOperationServer,
})
}
if toolName != "" && l.perUserTools != nil {
if s, ok := l.perUserTools[toolName]; ok {
// Key prefix "user-tool:" is distinct from "user:" to prevent
// collisions when a userID contains delimiter characters.
buckets = append(buckets, bucket.New(
s.namespace, s.serverName,
"user-tool:"+toolName+":"+userID,
s.maxTokens, s.refillPeriod,
))
checks = append(checks, limitCheck{
bucket: bucket.New(
s.namespace, s.serverName,
"user-tool:"+toolName+":"+userID,
s.maxTokens, s.refillPeriod,
),
scope: rateLimitScopePerUser,
operationType: rateLimitOperationTool,
})
}
}
}

if len(buckets) == 0 {
if len(checks) == 0 {
return &Decision{Allowed: true}, nil
}

buckets := make([]*bucket.TokenBucket, len(checks))
for i, check := range checks {
buckets[i] = check.bucket
}

start := time.Now()
rejectedIdx, err := bucket.ConsumeAll(ctx, l.client, buckets)
l.telemetry.recordCheckLatency(ctx, time.Since(start))
if err != nil {
l.telemetry.recordRedisError(ctx, err)
return nil, fmt.Errorf("rate limit check: %w", err)
}
if rejectedIdx >= 0 {
l.telemetry.recordRejected(ctx, checks[rejectedIdx])
return &Decision{
Allowed: false,
RetryAfter: buckets[rejectedIdx].RetryAfter(),
}, nil
}

l.telemetry.recordAllowed(ctx, checks)
return &Decision{Allowed: true}, nil
}

Expand Down
175 changes: 175 additions & 0 deletions pkg/ratelimit/observability.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// SPDX-FileCopyrightText: Copyright 2025 Stacklok, Inc.
// SPDX-License-Identifier: Apache-2.0

package ratelimit

import (
"context"
"errors"
"fmt"
"io"
"net"
"strings"
"time"

"github.com/redis/go-redis/v9"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/stacklok/toolhive/pkg/telemetry"
)

const (
rateLimitInstrumentationName = "github.com/stacklok/toolhive/pkg/ratelimit"

rateLimitDecisionAllowed = "allowed"
rateLimitDecisionRejected = "rejected"

rateLimitScopeShared = "shared"
rateLimitScopePerUser = "per_user"

rateLimitOperationServer = "server"
rateLimitOperationTool = "tool"

redisErrorTypeTimeout = "timeout"
redisErrorTypeConnection = "connection"
redisErrorTypeAuth = "auth"
redisErrorTypeOther = "other"
)

type rateLimitTelemetry struct {
namespace string
serverName string

decisions metric.Int64Counter
redisErrors metric.Int64Counter
checkLatency metric.Float64Histogram
}

func newRateLimitTelemetry(
meterProvider metric.MeterProvider,
namespace string,
serverName string,
) (*rateLimitTelemetry, error) {
if meterProvider == nil {
return nil, nil
}

meter := meterProvider.Meter(rateLimitInstrumentationName)

decisions, err := meter.Int64Counter(
"toolhive_rate_limit_decisions",
metric.WithDescription("Total number of rate limit bucket decisions"),
)
if err != nil {
return nil, fmt.Errorf("create decision counter: %w", err)
}

redisErrors, err := meter.Int64Counter(
"toolhive_rate_limit_redis_errors",
metric.WithDescription("Total number of Redis errors during rate limit checks"),
)
if err != nil {
return nil, fmt.Errorf("create Redis error counter: %w", err)
}

checkLatency, err := meter.Float64Histogram(
"toolhive_rate_limit_check_latency",
metric.WithDescription("Duration of Redis Lua rate limit checks in seconds"),
metric.WithUnit("s"),
metric.WithExplicitBucketBoundaries(telemetry.MCPHistogramBuckets...),
)
if err != nil {
return nil, fmt.Errorf("create check latency histogram: %w", err)
}

return &rateLimitTelemetry{
namespace: namespace,
serverName: serverName,
decisions: decisions,
redisErrors: redisErrors,
checkLatency: checkLatency,
}, nil
}

func (t *rateLimitTelemetry) recordAllowed(ctx context.Context, checks []limitCheck) {
if t == nil {
return
}
for _, check := range checks {
t.recordDecision(ctx, rateLimitDecisionAllowed, check)
}
}

func (t *rateLimitTelemetry) recordRejected(ctx context.Context, check limitCheck) {
if t == nil {
return
}
t.recordDecision(ctx, rateLimitDecisionRejected, check)
}

func (t *rateLimitTelemetry) recordDecision(ctx context.Context, decision string, check limitCheck) {
t.decisions.Add(ctx, 1, metric.WithAttributes(
attribute.String("namespace", t.namespace),
attribute.String("server", t.serverName),
attribute.String("decision", decision),
attribute.String("scope", check.scope),
attribute.String("operation_type", check.operationType),
))
}

func (t *rateLimitTelemetry) recordRedisError(ctx context.Context, err error) {
if t == nil {
return
}
t.redisErrors.Add(ctx, 1, metric.WithAttributes(
attribute.String("namespace", t.namespace),
attribute.String("server", t.serverName),
attribute.String("error_type", classifyRedisError(err)),
))
}

func (t *rateLimitTelemetry) recordCheckLatency(ctx context.Context, duration time.Duration) {
if t == nil {
return
}
t.checkLatency.Record(ctx, duration.Seconds(), metric.WithAttributes(
attribute.String("namespace", t.namespace),
attribute.String("server", t.serverName),
))
}

func classifyRedisError(err error) string {
if redis.IsAuthError(err) {
return redisErrorTypeAuth
}

if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, redis.ErrPoolTimeout) {
return redisErrorTypeTimeout
}
var netErr net.Error
if errors.As(err, &netErr) && netErr.Timeout() {
return redisErrorTypeTimeout
}

if errors.Is(err, redis.ErrClosed) ||
errors.Is(err, net.ErrClosed) ||
errors.Is(err, io.EOF) ||
errors.Is(err, io.ErrUnexpectedEOF) {
return redisErrorTypeConnection
}
var opErr *net.OpError
if errors.As(err, &opErr) {
return redisErrorTypeConnection
}

message := strings.ToLower(err.Error())
if strings.Contains(message, "connection refused") ||
strings.Contains(message, "connection reset") ||
strings.Contains(message, "broken pipe") ||
strings.Contains(message, "no such host") {
return redisErrorTypeConnection
}

return redisErrorTypeOther
}
Loading
Loading