-
Notifications
You must be signed in to change notification settings - Fork 35
Caching Strategy
rUv edited this page Jul 31, 2025
·
1 revision
Comprehensive guide to FACT's intelligent caching system and optimization strategies for maximum performance.
FACT's caching system is designed to achieve sub-100ms response times through a multi-tier, intelligent caching architecture that adapts to usage patterns and optimizes for both speed and efficiency.
| Metric | Target | Typical | Best Case |
|---|---|---|---|
| Cache Hit Latency | <50ms | 25ms | 10ms |
| Cache Miss Latency | <140ms | 100ms | 75ms |
| Hit Rate | >85% | 90% | 99% |
| Memory Efficiency | <500MB | 300MB | 200MB |
┌─────────────────────────────────────────┐
│ L1: Memory Cache │ ← Fastest (1-5ms)
│ Hot data • Small size • High hit rate │
└─────────────┬───────────────────────────┘
│
┌─────────────▼───────────────────────────┐
│ L2: Disk Cache │ ← Fast (5-20ms)
│ Warm data • Medium size • Med hit rate│
└─────────────┬───────────────────────────┘
│
┌─────────────▼───────────────────────────┐
│ L3: Remote Cache │ ← Good (20-50ms)
│ Cold data • Large size • Low hit rate │
└─────────────────────────────────────────┘
class MultiTierCache:
def __init__(self, config: CacheConfig):
# L1: In-memory LRU cache
self.l1 = LRUCache(
max_size=config.l1_size,
ttl=config.l1_ttl
)
# L2: Disk-based cache
self.l2 = DiskCache(
max_size=config.l2_size,
ttl=config.l2_ttl,
storage_path=config.disk_path
)
# L3: Distributed cache (Redis)
self.l3 = RedisCache(
url=config.redis_url,
ttl=config.l3_ttl
)
self.stats = CacheStats()
async def get(self, key: str) -> Optional[Any]:
# Try L1 first
value = self.l1.get(key)
if value is not None:
self.stats.record_hit('l1')
return value
# Try L2
value = await self.l2.get(key)
if value is not None:
self.stats.record_hit('l2')
# Promote to L1
self.l1.set(key, value)
return value
# Try L3
value = await self.l3.get(key)
if value is not None:
self.stats.record_hit('l3')
# Promote to L2 and L1
await self.l2.set(key, value)
self.l1.set(key, value)
return value
self.stats.record_miss()
return None
async def set(self, key: str, value: Any, ttl: Optional[int] = None):
# Store in all tiers
self.l1.set(key, value, ttl)
await self.l2.set(key, value, ttl)
await self.l3.set(key, value, ttl)import hashlib
import json
from typing import Dict, Any
class CacheKeyGenerator:
"""Generate deterministic, collision-resistant cache keys"""
@staticmethod
def generate_query_key(
query: str,
context: Dict[str, Any] = None,
version: str = "v1"
) -> str:
"""Generate cache key for queries"""
# Normalize query
normalized_query = CacheKeyGenerator._normalize_query(query)
# Extract stable context
stable_context = CacheKeyGenerator._extract_stable_context(context or {})
# Create key components
components = [
version,
"query",
hashlib.sha256(normalized_query.encode()).hexdigest()[:16],
hashlib.sha256(
json.dumps(stable_context, sort_keys=True).encode()
).hexdigest()[:8]
]
return ":".join(components)
@staticmethod
def _normalize_query(query: str) -> str:
"""Normalize query text for consistent keys"""
import re
# Convert to lowercase
normalized = query.lower().strip()
# Normalize whitespace
normalized = re.sub(r'\s+', ' ', normalized)
# Remove punctuation variations
normalized = re.sub(r'[.,;!?]+', '', normalized)
return normalized
@staticmethod
def _extract_stable_context(context: Dict[str, Any]) -> Dict[str, Any]:
"""Extract only stable context elements"""
stable_keys = {
'user_type', 'data_version', 'locale', 'format',
'sector', 'time_period', 'aggregation_level'
}
return {
k: v for k, v in context.items()
if k in stable_keys and v is not None
}# Query-based keys
query_key = "v1:query:a1b2c3d4:x9y8z7w6"
# Template-based keys
template_key = "v1:template:analysis-basic:data_hash"
# User-specific keys
user_key = "v1:user:123:preferences"
# Time-based keys (for time-sensitive data)
time_key = "v1:market:2025-01-31:summary"class TTLManager:
"""Intelligent TTL assignment based on content type"""
TTL_RULES = {
# Static/historical data - long TTL
'historical': 86400, # 24 hours
'reference': 43200, # 12 hours
'company_info': 21600, # 6 hours
# Semi-static data - medium TTL
'quarterly': 3600, # 1 hour
'monthly': 1800, # 30 minutes
'weekly': 900, # 15 minutes
# Dynamic data - short TTL
'daily': 300, # 5 minutes
'current': 60, # 1 minute
'realtime': 30, # 30 seconds
}
@classmethod
def get_ttl(cls, query: str, context: Dict[str, Any] = None) -> int:
"""Determine TTL based on query content and context"""
# Check for explicit TTL in context
if context and 'ttl' in context:
return context['ttl']
query_lower = query.lower()
# Time-based keywords
if any(word in query_lower for word in ['historical', 'past', 'archive']):
return cls.TTL_RULES['historical']
if any(word in query_lower for word in ['current', 'today', 'now']):
return cls.TTL_RULES['current']
if any(word in query_lower for word in ['real-time', 'live', 'streaming']):
return cls.TTL_RULES['realtime']
if any(word in query_lower for word in ['quarterly', 'q1', 'q2', 'q3', 'q4']):
return cls.TTL_RULES['quarterly']
# Default TTL
return 3600 # 1 hour
@classmethod
def adjust_ttl_for_volatility(cls, base_ttl: int, volatility: float) -> int:
"""Adjust TTL based on data volatility"""
# Higher volatility = shorter TTL
adjustment_factor = max(0.1, 1.0 - volatility)
return int(base_ttl * adjustment_factor)class CacheWarmer:
"""Proactive cache warming based on usage patterns"""
def __init__(self, driver: FACTDriver):
self.driver = driver
self.usage_patterns = UsagePatternAnalyzer()
self.warming_queue = asyncio.Queue()
async def start_warming_service(self):
"""Start background cache warming service"""
await asyncio.gather(
self._usage_monitor(),
self._warming_worker(),
self._scheduled_warming()
)
async def _usage_monitor(self):
"""Monitor query patterns for warming candidates"""
while True:
# Analyze recent query patterns
patterns = await self.usage_patterns.get_trending_queries(
window_hours=24,
min_frequency=5
)
# Queue high-frequency queries for warming
for pattern in patterns:
if not await self._is_cached(pattern.normalized_query):
await self.warming_queue.put({
'query': pattern.query,
'priority': pattern.frequency,
'estimated_ttl': pattern.avg_ttl
})
await asyncio.sleep(300) # Check every 5 minutes
async def _warming_worker(self):
"""Process cache warming queue"""
while True:
try:
item = await self.warming_queue.get()
# Warm cache with low priority
await self._warm_query(
item['query'],
ttl=item['estimated_ttl']
)
self.warming_queue.task_done()
except Exception as e:
logger.warning(f"Cache warming failed: {e}")
async def _scheduled_warming(self):
"""Scheduled warming for known important queries"""
important_queries = [
"Show top 10 companies by market cap",
"List all active sectors",
"Get current market indicators",
"Show today's most active stocks"
]
while True:
# Warm important queries every hour
for query in important_queries:
try:
await self._warm_query(query, ttl=3600)
except Exception as e:
logger.warning(f"Scheduled warming failed for '{query}': {e}")
await asyncio.sleep(3600) # Every hour
async def _warm_query(self, query: str, ttl: int):
"""Warm cache for a specific query"""
try:
# Process query with background priority
await self.driver.process_query(
query,
context={'background_warming': True},
use_cache=True
)
logger.debug(f"Warmed cache for query: {query[:50]}...")
except Exception as e:
logger.warning(f"Failed to warm query '{query}': {e}")class DependencyWarmer:
"""Warm related queries based on dependencies"""
DEPENDENCY_RULES = {
# If user queries company info, warm related data
'company_query': [
"Get {company} financial metrics",
"Show {company} sector performance",
"List {company} competitors"
],
# If user queries sector, warm company data
'sector_query': [
"List top companies in {sector}",
"Show {sector} market trends",
"Get {sector} performance metrics"
]
}
async def warm_dependencies(self, query: str, context: Dict[str, Any]):
"""Warm dependent queries based on current query"""
# Extract entities from query
entities = self._extract_entities(query)
# Generate dependent queries
dependent_queries = []
if entities.get('company'):
templates = self.DEPENDENCY_RULES['company_query']
dependent_queries.extend([
template.format(company=entities['company'])
for template in templates
])
if entities.get('sector'):
templates = self.DEPENDENCY_RULES['sector_query']
dependent_queries.extend([
template.format(sector=entities['sector'])
for template in templates
])
# Warm dependent queries asynchronously
warming_tasks = [
self._warm_query_background(dep_query)
for dep_query in dependent_queries
]
# Fire and forget
asyncio.create_task(asyncio.gather(*warming_tasks, return_exceptions=True))class ScoredLRUCache:
"""LRU cache with intelligent scoring for eviction decisions"""
def __init__(self, max_size: int):
self.max_size = max_size
self.cache = OrderedDict()
self.scores = {}
self.access_counts = {}
self.creation_times = {}
def _calculate_score(self, key: str) -> float:
"""Calculate eviction score (higher = keep longer)"""
now = time.time()
# Base score components
access_frequency = self.access_counts.get(key, 0)
recency = now - self.creation_times.get(key, now)
# Frequency score (logarithmic to prevent dominance)
freq_score = math.log(access_frequency + 1) * 10
# Recency score (inverse of age)
recency_score = max(0, 100 - (recency / 3600)) # Decay over hours
# Size penalty (smaller items score higher)
size_penalty = len(str(self.cache.get(key, ""))) / 1000
# Cost-to-recreate bonus (expensive queries score higher)
cost_bonus = self._estimate_recreation_cost(key) * 5
total_score = freq_score + recency_score - size_penalty + cost_bonus
return max(0, total_score)
def _estimate_recreation_cost(self, key: str) -> float:
"""Estimate cost to recreate this cache entry"""
# Parse key to understand query complexity
if 'aggregate' in key or 'analyze' in key:
return 3.0 # Complex queries are expensive
elif 'simple' in key or 'basic' in key:
return 1.0 # Simple queries are cheap
else:
return 2.0 # Default medium cost
def _evict_candidates(self, count: int = 1):
"""Evict lowest-scoring entries"""
# Calculate scores for all entries
scored_items = [
(key, self._calculate_score(key))
for key in self.cache.keys()
]
# Sort by score (ascending - lowest first)
scored_items.sort(key=lambda x: x[1])
# Evict lowest-scoring items
for key, score in scored_items[:count]:
self._remove_entry(key)
logger.debug(f"Evicted cache entry: {key} (score: {score:.2f})")
def set(self, key: str, value: Any, ttl: Optional[int] = None):
"""Set value with intelligent eviction"""
now = time.time()
# Remove if exists (to update position)
if key in self.cache:
del self.cache[key]
# Check if eviction needed
if len(self.cache) >= self.max_size:
self._evict_candidates(count=max(1, int(self.max_size * 0.1)))
# Add new entry
self.cache[key] = {
'value': value,
'ttl': ttl,
'created_at': now,
'expires_at': now + ttl if ttl else None
}
# Update metadata
self.access_counts[key] = 1
self.creation_times[key] = now
self.scores[key] = self._calculate_score(key)class CacheAnalytics:
"""Comprehensive cache performance analytics"""
def __init__(self):
self.metrics = {
'hits': {'l1': 0, 'l2': 0, 'l3': 0},
'misses': 0,
'evictions': 0,
'response_times': [],
'memory_usage': [],
'key_patterns': defaultdict(int)
}
def record_hit(self, tier: str, response_time: float):
"""Record cache hit"""
self.metrics['hits'][tier] += 1
self.metrics['response_times'].append(response_time)
def record_miss(self, response_time: float):
"""Record cache miss"""
self.metrics['misses'] += 1
self.metrics['response_times'].append(response_time)
def get_performance_report(self) -> Dict[str, Any]:
"""Generate comprehensive performance report"""
total_hits = sum(self.metrics['hits'].values())
total_requests = total_hits + self.metrics['misses']
if total_requests == 0:
return {'error': 'No data available'}
response_times = self.metrics['response_times']
return {
'hit_rate': {
'overall': total_hits / total_requests,
'l1': self.metrics['hits']['l1'] / total_requests,
'l2': self.metrics['hits']['l2'] / total_requests,
'l3': self.metrics['hits']['l3'] / total_requests,
},
'performance': {
'avg_response_time': sum(response_times) / len(response_times),
'p50_response_time': sorted(response_times)[len(response_times) // 2],
'p95_response_time': sorted(response_times)[int(len(response_times) * 0.95)],
'p99_response_time': sorted(response_times)[int(len(response_times) * 0.99)],
},
'efficiency': {
'total_requests': total_requests,
'cache_efficiency': total_hits / total_requests,
'eviction_rate': self.metrics['evictions'] / total_requests,
},
'recommendations': self._generate_recommendations()
}
def _generate_recommendations(self) -> List[str]:
"""Generate optimization recommendations"""
recommendations = []
total_hits = sum(self.metrics['hits'].values())
total_requests = total_hits + self.metrics['misses']
hit_rate = total_hits / total_requests if total_requests > 0 else 0
if hit_rate < 0.8:
recommendations.append("Consider increasing cache size - hit rate below 80%")
if self.metrics['hits']['l1'] / total_requests < 0.6:
recommendations.append("L1 cache may be too small - consider increasing memory allocation")
if self.metrics['evictions'] / total_requests > 0.1:
recommendations.append("High eviction rate detected - optimize TTL settings")
avg_response = sum(self.metrics['response_times']) / len(self.metrics['response_times'])
if avg_response > 50:
recommendations.append("Average response time above target - check cache efficiency")
return recommendations# Development
development:
cache:
l1_size: 1000
l2_size: 5000
l3_size: 10000
default_ttl: 300
warming_enabled: false
# Staging
staging:
cache:
l1_size: 5000
l2_size: 20000
l3_size: 50000
default_ttl: 1800
warming_enabled: true
# Production
production:
cache:
l1_size: 10000
l2_size: 50000
l3_size: 200000
default_ttl: 3600
warming_enabled: true
warming_concurrency: 5class CacheTuner:
"""Automatic cache configuration tuning"""
@staticmethod
def calculate_optimal_sizes(
daily_query_volume: int,
avg_response_size_kb: float,
available_memory_mb: int,
target_hit_rate: float = 0.85
) -> Dict[str, int]:
"""Calculate optimal cache sizes"""
# Estimate working set size (queries accessed in rolling window)
working_set = int(daily_query_volume * 0.3) # 30% are frequently repeated
# Calculate memory allocation
l1_memory = int(available_memory_mb * 0.4) # 40% for L1
l2_memory = int(available_memory_mb * 0.3) # 30% for L2
# Calculate entry counts
avg_entry_size = avg_response_size_kb * 1024
l1_size = int((l1_memory * 1024 * 1024) / avg_entry_size)
l2_size = int((l2_memory * 1024 * 1024) / avg_entry_size)
# Adjust for target hit rate
total_needed = int(working_set / target_hit_rate)
l3_size = max(0, total_needed - l1_size - l2_size)
return {
'l1_size': min(l1_size, 50000), # Cap at reasonable limit
'l2_size': min(l2_size, 200000),
'l3_size': min(l3_size, 1000000),
'estimated_hit_rate': min(1.0, (l1_size + l2_size + l3_size) / working_set)
}This caching strategy provides a comprehensive foundation for achieving optimal performance with FACT. The multi-tier architecture, intelligent key management, and adaptive policies ensure both speed and efficiency while maintaining data freshness.