Skip to content

Latest commit

 

History

History
371 lines (301 loc) · 9.44 KB

File metadata and controls

371 lines (301 loc) · 9.44 KB

Cookbook: Monitoring & Observability

Comprehensive monitoring for Valkey RAG cache systems.

Key Metrics

Cache Metrics

Metric Description Target
cache_hit_rate % of queries served from cache >70%
cache_latency_p50 Median cache lookup time <5ms
cache_latency_p99 99th percentile latency <20ms
cache_size Number of cached entries Monitor growth
cache_memory_usage Memory used by cache <80% of max

Vector Search Metrics

Metric Description Target
search_latency_p50 Median search time <10ms
search_latency_p99 99th percentile search <50ms
index_size Number of vectors Monitor
recall_rate Search accuracy >95%

Prometheus Metrics

Python Implementation

from prometheus_client import Counter, Histogram, Gauge, start_http_server

# Define metrics
CACHE_HITS = Counter('rag_cache_hits_total', 'Total cache hits')
CACHE_MISSES = Counter('rag_cache_misses_total', 'Total cache misses')
CACHE_LATENCY = Histogram(
    'rag_cache_latency_seconds',
    'Cache lookup latency',
    buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]
)
CACHE_SIZE = Gauge('rag_cache_size', 'Number of cached entries')
SIMILARITY_SCORE = Histogram(
    'rag_similarity_score',
    'Similarity scores for cache lookups',
    buckets=[0.7, 0.8, 0.85, 0.9, 0.92, 0.95, 0.98, 1.0]
)

class InstrumentedCache:
    def __init__(self, cache: SemanticCache):
        self.cache = cache
    
    async def lookup(self, query: str):
        with CACHE_LATENCY.time():
            result = await self.cache.lookup(query)
        
        if result.hit:
            CACHE_HITS.inc()
            SIMILARITY_SCORE.observe(result.similarity_score)
        else:
            CACHE_MISSES.inc()
        
        return result
    
    def update_size(self):
        stats = self.cache.get_stats()
        CACHE_SIZE.set(stats['num_cached_entries'])

# Start metrics server
start_http_server(9090)

Grafana Dashboard JSON

{
  "dashboard": {
    "title": "Valkey RAG Cache",
    "panels": [
      {
        "title": "Cache Hit Rate",
        "type": "gauge",
        "targets": [{
          "expr": "sum(rate(rag_cache_hits_total[5m])) / (sum(rate(rag_cache_hits_total[5m])) + sum(rate(rag_cache_misses_total[5m])))"
        }],
        "fieldConfig": {
          "defaults": {
            "min": 0,
            "max": 1,
            "thresholds": {
              "steps": [
                {"value": 0, "color": "red"},
                {"value": 0.5, "color": "yellow"},
                {"value": 0.7, "color": "green"}
              ]
            }
          }
        }
      },
      {
        "title": "Cache Latency (p50/p99)",
        "type": "timeseries",
        "targets": [
          {"expr": "histogram_quantile(0.5, rate(rag_cache_latency_seconds_bucket[5m]))", "legendFormat": "p50"},
          {"expr": "histogram_quantile(0.99, rate(rag_cache_latency_seconds_bucket[5m]))", "legendFormat": "p99"}
        ]
      },
      {
        "title": "Cache Size",
        "type": "stat",
        "targets": [{"expr": "rag_cache_size"}]
      }
    ]
  }
}

Valkey Metrics

INFO Command

# Monitor with valkey-cli
valkey-cli INFO stats | grep -E "(keyspace_hits|keyspace_misses|total_commands)"
valkey-cli INFO memory | grep -E "(used_memory_human|maxmemory_human|mem_fragmentation_ratio)"
valkey-cli INFO replication

Key Metrics to Export

def collect_valkey_metrics():
    info = client.info()
    
    return {
        # Memory
        "used_memory_bytes": info["used_memory"],
        "maxmemory_bytes": info.get("maxmemory", 0),
        "mem_fragmentation_ratio": info["mem_fragmentation_ratio"],
        
        # Performance
        "keyspace_hits": info["keyspace_hits"],
        "keyspace_misses": info["keyspace_misses"],
        "ops_per_sec": info["instantaneous_ops_per_sec"],
        
        # Connections
        "connected_clients": info["connected_clients"],
        "blocked_clients": info["blocked_clients"],
        
        # Replication
        "connected_slaves": info.get("connected_slaves", 0),
        "repl_backlog_size": info.get("repl_backlog_size", 0),
    }

Structured Logging

import structlog
from datetime import datetime

structlog.configure(
    processors=[
        structlog.stdlib.filter_by_level,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.add_log_level,
        structlog.processors.JSONRenderer()
    ]
)

logger = structlog.get_logger()

# Log cache operations
async def lookup_with_logging(query: str):
    start = time.perf_counter()
    result = await cache.lookup(query)
    latency_ms = (time.perf_counter() - start) * 1000
    
    logger.info(
        "cache_lookup",
        query_length=len(query),
        cache_hit=result.hit,
        similarity_score=result.similarity_score,
        latency_ms=latency_ms,
    )
    
    return result

Log Format

{
  "timestamp": "2024-03-10T12:00:00Z",
  "level": "info",
  "event": "cache_lookup",
  "query_length": 45,
  "cache_hit": true,
  "similarity_score": 0.94,
  "latency_ms": 3.2
}

Alerting Rules

Prometheus Alerts

groups:
  - name: rag-cache-alerts
    rules:
      - alert: LowCacheHitRate
        expr: |
          sum(rate(rag_cache_hits_total[15m])) / 
          (sum(rate(rag_cache_hits_total[15m])) + sum(rate(rag_cache_misses_total[15m]))) < 0.5
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Cache hit rate is below 50%"
          
      - alert: HighCacheLatency
        expr: histogram_quantile(0.99, rate(rag_cache_latency_seconds_bucket[5m])) > 0.1
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Cache p99 latency exceeds 100ms"
          
      - alert: ValkeyHighMemory
        expr: valkey_memory_used_bytes / valkey_memory_max_bytes > 0.9
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Valkey memory usage above 90%"
          
      - alert: ValkeyReplicationLag
        expr: valkey_replication_lag_seconds > 10
        for: 2m
        labels:
          severity: warning
        annotations:
          summary: "Valkey replication lag exceeds 10 seconds"

Health Checks

HTTP Health Endpoint

@app.get("/health")
async def health():
    checks = {}
    
    # Valkey connectivity
    try:
        client.ping()
        checks["valkey"] = {"status": "healthy"}
    except Exception as e:
        checks["valkey"] = {"status": "unhealthy", "error": str(e)}
    
    # Index status
    try:
        info = client.ft(index_name).info()
        checks["index"] = {
            "status": "healthy",
            "num_docs": info.get("num_docs", 0),
        }
    except Exception as e:
        checks["index"] = {"status": "unhealthy", "error": str(e)}
    
    # Cache stats
    stats = cache.get_stats()
    checks["cache"] = {
        "hit_rate": stats["hit_rate"],
        "entries": stats["num_cached_entries"],
    }
    
    overall = "healthy" if all(c.get("status") == "healthy" for c in checks.values()) else "unhealthy"
    
    return {"status": overall, "checks": checks}

Kubernetes Probes

livenessProbe:
  httpGet:
    path: /health
    port: 8000
  initialDelaySeconds: 10
  periodSeconds: 30

readinessProbe:
  httpGet:
    path: /health
    port: 8000
  initialDelaySeconds: 5
  periodSeconds: 10

Debugging

Slow Query Log

# Enable in valkey.conf
slowlog-log-slower-than 10000  # Log queries > 10ms
slowlog-max-len 128

# View slow queries
valkey-cli SLOWLOG GET 10

Connection Debugging

# Monitor connections
def monitor_connections():
    while True:
        info = client.info("clients")
        print(f"Connected: {info['connected_clients']}")
        print(f"Blocked: {info['blocked_clients']}")
        
        # List clients
        clients = client.client_list()
        for c in clients:
            print(f"  {c['addr']} - idle: {c['idle']}s")
        
        time.sleep(60)

Cost Tracking

class CostTracker:
    """Track cost savings from caching."""
    
    def __init__(self, cost_per_llm_call: float = 0.001):
        self.cost_per_call = cost_per_llm_call
        self.cache_hits = 0
        self.cache_misses = 0
    
    def record_hit(self):
        self.cache_hits += 1
    
    def record_miss(self):
        self.cache_misses += 1
    
    def get_savings(self) -> dict:
        total_queries = self.cache_hits + self.cache_misses
        without_cache_cost = total_queries * self.cost_per_call
        with_cache_cost = self.cache_misses * self.cost_per_call
        
        return {
            "total_queries": total_queries,
            "cache_hits": self.cache_hits,
            "cost_without_cache": f"${without_cache_cost:.2f}",
            "cost_with_cache": f"${with_cache_cost:.2f}",
            "savings": f"${without_cache_cost - with_cache_cost:.2f}",
            "savings_percent": f"{(self.cache_hits / total_queries * 100):.1f}%" if total_queries > 0 else "0%",
        }

Summary

  1. Instrument everything - Cache hits/misses, latencies, similarity scores
  2. Set meaningful alerts - Focus on user-impacting metrics
  3. Log contextually - Include query context for debugging
  4. Monitor costs - Track LLM API savings
  5. Regular reviews - Analyze hit rates and optimize thresholds