Skip to content

Best Practices

rUv edited this page Jul 31, 2025 · 1 revision

Best Practices

Essential guidelines and recommendations for using FACT effectively in production environments.

🎯 Core Principles

1. Cache-First Architecture

Always design with caching in mind:

  • Make queries deterministic and cacheable
  • Use consistent parameters for similar queries
  • Set appropriate TTL values for different data types
  • Monitor cache hit rates and optimize accordingly

2. Error Handling

Implement comprehensive error handling:

  • Use try-catch blocks for all FACT operations
  • Implement fallback mechanisms for critical operations
  • Log errors with sufficient context for debugging
  • Provide meaningful error messages to users

3. Performance Monitoring

Track key metrics continuously:

  • Response times (p50, p95, p99)
  • Cache hit rates
  • Memory usage
  • Query patterns and frequencies

🏗️ Architecture Best Practices

Multi-Tier Deployment

# Production architecture
production:
  load_balancer:
    - nginx
    - health_checks: true
  
  application_tier:
    - fact_instances: 3
    - max_connections: 100
    - worker_processes: 4
  
  cache_tier:
    - redis_cluster: true
    - memory: 8GB
    - persistence: enabled
  
  database_tier:
    - read_replicas: 2
    - connection_pool: 20
    - backup_schedule: daily

Configuration Management

# config/production.py
class ProductionConfig:
    # Cache settings
    CACHE_MAX_SIZE = 50000
    CACHE_TTL = 3600  # 1 hour for stable data
    CACHE_TTL_DYNAMIC = 300  # 5 minutes for dynamic data
    
    # Performance settings
    MAX_CONCURRENT_QUERIES = 100
    QUERY_TIMEOUT = 30.0
    CONNECTION_POOL_SIZE = 20
    
    # Security settings
    ENABLE_QUERY_VALIDATION = True
    MAX_QUERY_LENGTH = 10000
    RATE_LIMIT_PER_MINUTE = 1000
    
    # Monitoring
    ENABLE_METRICS = True
    METRICS_EXPORT_INTERVAL = 60
    LOG_LEVEL = "INFO"

💾 Caching Strategies

Cache Key Design

def generate_cache_key(query: str, context: dict) -> str:
    """Generate deterministic cache keys"""
    # Normalize query text
    normalized_query = query.lower().strip()
    normalized_query = re.sub(r'\s+', ' ', normalized_query)
    
    # Extract stable context elements
    stable_context = {
        k: v for k, v in context.items()
        if k in ['user_type', 'data_version', 'locale']
    }
    
    # Create composite key
    key_components = [
        normalized_query,
        json.dumps(stable_context, sort_keys=True)
    ]
    
    return hashlib.sha256(':'.join(key_components).encode()).hexdigest()[:16]

Cache Warming Strategy

async def warm_cache_on_startup():
    """Warm cache with frequently used queries"""
    common_queries = [
        "Show top 10 companies by revenue",
        "List all sectors",
        "Get current market indicators",
        "Show recent IPOs"
    ]
    
    for query in common_queries:
        try:
            await driver.process_query(query, use_cache=True)
        except Exception as e:
            logger.warning(f"Failed to warm cache for query: {query}, error: {e}")

TTL Management

def get_ttl_for_query(query: str) -> int:
    """Determine appropriate TTL based on query type"""
    if 'historical' in query.lower():
        return 86400  # 24 hours for historical data
    elif 'current' in query.lower() or 'today' in query.lower():
        return 300    # 5 minutes for current data
    elif 'real-time' in query.lower():
        return 60     # 1 minute for real-time data
    else:
        return 3600   # 1 hour default

🔒 Security Best Practices

Input Validation

import re
from typing import Dict, Any

class QueryValidator:
    """Validate queries for security and correctness"""
    
    # Blocked patterns
    BLOCKED_PATTERNS = [
        r'(?i)(union|select|insert|update|delete|drop|exec|execute)',
        r'(?i)(script|javascript|<script)',
        r'(?i)(system|shell|cmd|powershell)',
        r'[;\'"\\]',  # SQL injection characters
    ]
    
    @classmethod
    def validate_query(cls, query: str) -> bool:
        """Validate query safety"""
        if len(query) > 10000:
            raise ValueError("Query too long")
        
        for pattern in cls.BLOCKED_PATTERNS:
            if re.search(pattern, query):
                raise ValueError(f"Query contains blocked pattern: {pattern}")
        
        return True
    
    @classmethod
    def sanitize_context(cls, context: Dict[str, Any]) -> Dict[str, Any]:
        """Sanitize context parameters"""
        sanitized = {}
        
        for key, value in context.items():
            # Only allow alphanumeric keys
            if not re.match(r'^[a-zA-Z0-9_]+$', key):
                continue
            
            # Sanitize string values
            if isinstance(value, str):
                value = re.sub(r'[<>"\']', '', value)[:1000]
            
            sanitized[key] = value
        
        return sanitized

Authentication & Authorization

from functools import wraps
import jwt

def require_auth(f):
    """Decorator for authentication"""
    @wraps(f)
    async def decorated_function(*args, **kwargs):
        token = request.headers.get('Authorization')
        if not token:
            raise HTTPException(401, "Authentication required")
        
        try:
            # Verify JWT token
            payload = jwt.decode(token.replace('Bearer ', ''), 
                               secret_key, algorithms=['HS256'])
            request.user = payload
        except jwt.InvalidTokenError:
            raise HTTPException(401, "Invalid token")
        
        return await f(*args, **kwargs)
    return decorated_function

def check_rate_limit(user_id: str) -> bool:
    """Check if user has exceeded rate limit"""
    key = f"rate_limit:{user_id}"
    current = redis_client.get(key) or 0
    
    if int(current) >= RATE_LIMIT_PER_MINUTE:
        return False
    
    # Increment counter
    pipe = redis_client.pipeline()
    pipe.incr(key)
    pipe.expire(key, 60)
    pipe.execute()
    
    return True

⚡ Performance Optimization

Query Optimization

class QueryOptimizer:
    """Optimize queries for better performance"""
    
    def __init__(self):
        self.query_patterns = {}
        self.performance_stats = defaultdict(list)
    
    def analyze_query(self, query: str) -> dict:
        """Analyze query complexity and suggest optimizations"""
        complexity_score = 0
        suggestions = []
        
        # Check query length
        if len(query) > 1000:
            complexity_score += 2
            suggestions.append("Consider breaking into smaller queries")
        
        # Check for aggregations
        if any(word in query.lower() for word in ['sum', 'count', 'average', 'group']):
            complexity_score += 1
            suggestions.append("Aggregation queries benefit from caching")
        
        # Check for time ranges
        if any(word in query.lower() for word in ['last', 'recent', 'today', 'current']):
            suggestions.append("Consider shorter TTL for time-sensitive queries")
        
        return {
            'complexity_score': complexity_score,
            'suggestions': suggestions,
            'estimated_cache_ttl': self._suggest_ttl(query)
        }
    
    def _suggest_ttl(self, query: str) -> int:
        """Suggest appropriate TTL based on query content"""
        if 'historical' in query.lower():
            return 86400  # 24 hours
        elif any(word in query.lower() for word in ['current', 'today', 'now']):
            return 300    # 5 minutes
        else:
            return 3600   # 1 hour

Async Patterns

import asyncio
from asyncio import Semaphore
from typing import List

class AsyncProcessor:
    """Optimized async processing patterns"""
    
    def __init__(self, max_concurrent: int = 20):
        self.semaphore = Semaphore(max_concurrent)
        self.driver = None
    
    async def process_with_backpressure(self, queries: List[str]) -> List[dict]:
        """Process queries with backpressure control"""
        async def limited_process(query: str) -> dict:
            async with self.semaphore:
                try:
                    result = await self.driver.process_query(query)
                    return {'query': query, 'result': result, 'status': 'success'}
                except Exception as e:
                    return {'query': query, 'error': str(e), 'status': 'error'}
        
        # Process with controlled concurrency
        tasks = [limited_process(query) for query in queries]
        return await asyncio.gather(*tasks)
    
    async def process_with_timeout(self, query: str, timeout: float = 30.0) -> dict:
        """Process query with timeout"""
        try:
            result = await asyncio.wait_for(
                self.driver.process_query(query),
                timeout=timeout
            )
            return {'result': result, 'status': 'success'}
        except asyncio.TimeoutError:
            return {'error': 'Query timeout', 'status': 'timeout'}
        except Exception as e:
            return {'error': str(e), 'status': 'error'}

🔍 Monitoring & Observability

Metrics Collection

import time
from prometheus_client import Counter, Histogram, Gauge

# Define metrics
query_counter = Counter('fact_queries_total', 'Total queries processed', ['status'])
query_duration = Histogram('fact_query_duration_seconds', 'Query processing time')
cache_hit_rate = Gauge('fact_cache_hit_rate', 'Cache hit rate')
active_connections = Gauge('fact_active_connections', 'Active connections')

class MetricsCollector:
    """Collect and export metrics"""
    
    def __init__(self):
        self.start_time = time.time()
    
    async def process_with_metrics(self, query: str) -> str:
        """Process query and collect metrics"""
        start_time = time.time()
        
        try:
            # Process query
            result = await driver.process_query(query)
            
            # Record success
            query_counter.labels(status='success').inc()
            query_duration.observe(time.time() - start_time)
            
            return result
            
        except Exception as e:
            # Record failure
            query_counter.labels(status='error').inc()
            raise e
    
    def update_cache_metrics(self):
        """Update cache-related metrics"""
        stats = driver.cache.stats()
        hit_rate = stats.hits / (stats.hits + stats.misses) if stats.misses > 0 else 1.0
        cache_hit_rate.set(hit_rate)

Health Checks

from dataclasses import dataclass
from typing import Dict, List

@dataclass
class HealthStatus:
    component: str
    status: str  # healthy, degraded, unhealthy
    message: str
    response_time_ms: float

class HealthChecker:
    """Comprehensive health checking"""
    
    async def check_cache_health(self) -> HealthStatus:
        """Check cache system health"""
        try:
            start = time.time()
            
            # Test cache operations
            test_key = "health_check_" + str(int(time.time()))
            driver.cache.set(test_key, "test_value", ttl=60)
            value = driver.cache.get(test_key)
            driver.cache.delete(test_key)
            
            response_time = (time.time() - start) * 1000
            
            if value == "test_value" and response_time < 100:
                return HealthStatus("cache", "healthy", "Cache operational", response_time)
            else:
                return HealthStatus("cache", "degraded", "Cache slow response", response_time)
                
        except Exception as e:
            return HealthStatus("cache", "unhealthy", f"Cache error: {e}", 0)
    
    async def check_query_processing(self) -> HealthStatus:
        """Check query processing health"""
        try:
            start = time.time()
            result = await driver.process_query("SELECT 1")  # Simple test query
            response_time = (time.time() - start) * 1000
            
            if response_time < 1000:  # Under 1 second
                return HealthStatus("queries", "healthy", "Query processing normal", response_time)
            else:
                return HealthStatus("queries", "degraded", "Query processing slow", response_time)
                
        except Exception as e:
            return HealthStatus("queries", "unhealthy", f"Query error: {e}", 0)
    
    async def get_overall_health(self) -> Dict:
        """Get comprehensive health status"""
        checks = [
            await self.check_cache_health(),
            await self.check_query_processing()
        ]
        
        # Determine overall status
        if all(check.status == "healthy" for check in checks):
            overall_status = "healthy"
        elif any(check.status == "unhealthy" for check in checks):
            overall_status = "unhealthy"
        else:
            overall_status = "degraded"
        
        return {
            "status": overall_status,
            "timestamp": time.time(),
            "checks": [check.__dict__ for check in checks]
        }

🧪 Testing Best Practices

Unit Testing

import pytest
import asyncio
from unittest.mock import Mock, patch
from src.core.driver import FACTDriver

class TestFACTDriver:
    """Comprehensive FACT driver tests"""
    
    @pytest.fixture
    async def driver(self):
        """Create test driver instance"""
        config = Config(
            anthropic_api_key="test-key",
            cache_enabled=True,
            cache_max_size=100
        )
        driver = FACTDriver(config)
        await driver.initialize()
        yield driver
        await driver.close()
    
    @pytest.mark.asyncio
    async def test_basic_query_processing(self, driver):
        """Test basic query processing"""
        query = "What is 2 + 2?"
        result = await driver.process_query(query)
        
        assert result is not None
        assert isinstance(result, str)
        assert len(result) > 0
    
    @pytest.mark.asyncio
    async def test_cache_functionality(self, driver):
        """Test cache hit/miss behavior"""
        query = "Test query for caching"
        
        # First call - should be cache miss
        result1 = await driver.process_query(query)
        
        # Second call - should be cache hit
        result2 = await driver.process_query(query)
        
        assert result1 == result2  # Results should be identical
    
    @pytest.mark.asyncio
    async def test_concurrent_queries(self, driver):
        """Test concurrent query processing"""
        queries = [f"Test query {i}" for i in range(10)]
        
        # Process concurrently
        tasks = [driver.process_query(query) for query in queries]
        results = await asyncio.gather(*tasks)
        
        assert len(results) == len(queries)
        assert all(result is not None for result in results)
    
    @pytest.mark.asyncio
    async def test_error_handling(self, driver):
        """Test error handling for invalid queries"""
        with pytest.raises(Exception):
            await driver.process_query("")  # Empty query should raise error

Integration Testing

@pytest.mark.integration
class TestFACTIntegration:
    """Integration tests for FACT system"""
    
    async def test_end_to_end_workflow(self):
        """Test complete workflow from query to response"""
        driver = await get_driver()
        
        try:
            # Test realistic query
            query = "Analyze financial performance for technology companies"
            result = await driver.process_query(query)
            
            # Verify result structure
            assert result is not None
            assert len(result) > 50  # Substantial response
            
            # Test cache performance
            stats = driver.cache.stats()
            assert stats.hit_rate >= 0  # Valid hit rate
            
        finally:
            await driver.close()
    
    async def test_performance_requirements(self):
        """Test performance meets requirements"""
        driver = await get_driver()
        
        try:
            # Test response time
            start = time.time()
            await driver.process_query("Simple test query")
            response_time = (time.time() - start) * 1000
            
            assert response_time < 100  # Sub-100ms requirement
            
        finally:
            await driver.close()

📊 Production Deployment

Environment Configuration

# docker-compose.production.yml
version: '3.8'
services:
  fact-app:
    image: fact-system:latest
    environment:
      - FACT_ENV=production
      - FACT_CACHE_SIZE=50000
      - FACT_WORKERS=4
      - REDIS_URL=redis://redis:6379
    depends_on:
      - redis
      - postgres
    deploy:
      replicas: 3
      resources:
        limits:
          memory: 2G
          cpus: '1.0'
        reservations:
          memory: 1G
          cpus: '0.5'
  
  redis:
    image: redis:7-alpine
    command: redis-server --maxmemory 1gb --maxmemory-policy allkeys-lru
    volumes:
      - redis_data:/data
  
  postgres:
    image: postgres:15
    environment:
      - POSTGRES_DB=fact
      - POSTGRES_PASSWORD=secure_password
    volumes:
      - postgres_data:/var/lib/postgresql/data

volumes:
  redis_data:
  postgres_data:

Monitoring Setup

# monitoring.yml
version: '3.8'
services:
  prometheus:
    image: prom/prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
  
  grafana:
    image: grafana/grafana
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
    volumes:
      - grafana_data:/var/lib/grafana
      - ./grafana/dashboards:/etc/grafana/provisioning/dashboards

volumes:
  grafana_data:

🚨 Common Pitfalls to Avoid

1. Cache Key Inconsistency

Don't: Use inconsistent cache keys

# Bad - inconsistent formatting
cache.set(f"user_{user_id}", data)
cache.set(f"user-{user_id}", data)  # Different format!

Do: Use consistent cache key patterns

# Good - consistent formatting
def get_user_cache_key(user_id: str) -> str:
    return f"user:{user_id}"

2. Ignoring Error Handling

Don't: Ignore exceptions

# Bad - no error handling
result = await driver.process_query(query)

Do: Handle errors gracefully

# Good - proper error handling
try:
    result = await driver.process_query(query)
except QueryError as e:
    logger.error(f"Query failed: {e}")
    result = get_fallback_result()

3. Blocking Operations

Don't: Use blocking operations in async context

# Bad - blocking operation
result = requests.get(url)  # Blocks event loop

Do: Use async operations

# Good - non-blocking operation
async with aiohttp.ClientSession() as session:
    async with session.get(url) as response:
        result = await response.json()

4. Inadequate Resource Management

Don't: Forget to close resources

# Bad - resource leak
driver = await get_driver()
# Missing: await driver.close()

Do: Use context managers or ensure cleanup

# Good - proper resource management
async with get_driver() as driver:
    result = await driver.process_query(query)

Following these best practices will help you build robust, scalable, and maintainable applications with FACT.

Clone this wiki locally