-
Notifications
You must be signed in to change notification settings - Fork 35
Architecture
Comprehensive overview of FACT's system architecture, design patterns, and component interactions.
FACT follows a layered architecture designed for high performance, scalability, and maintainability:
┌─────────────────────────────────────────────────────────┐
│ Presentation Layer │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────┐ │
│ │ CLI │ │ API │ │ Web │ │ WASM │ │
│ │Interface │ │Endpoints │ │Interface │ │Browser │ │
│ └──────────┘ └──────────┘ └──────────┘ └────────┘ │
└─────────────────────┬───────────────────────────────────┘
│
┌─────────────────────▼───────────────────────────────────┐
│ Business Logic Layer │
│ ┌──────────────┐ ┌──────────────┐ ┌────────────────┐ │
│ │ Query │ │ Template │ │ Security │ │
│ │ Processor │ │ Engine │ │ Manager │ │
│ └──────────────┘ └──────────────┘ └────────────────┘ │
└─────────────────────┬───────────────────────────────────┘
│
┌─────────────────────▼───────────────────────────────────┐
│ Service Layer │
│ ┌──────────────┐ ┌──────────────┐ ┌────────────────┐ │
│ │ Cache │ │ Database │ │ Tools │ │
│ │ Manager │ │ Manager │ │ Executor │ │
│ └──────────────┘ └──────────────┘ └────────────────┘ │
└─────────────────────┬───────────────────────────────────┘
│
┌─────────────────────▼───────────────────────────────────┐
│ Infrastructure Layer │
│ ┌──────────────┐ ┌──────────────┐ ┌────────────────┐ │
│ │ Storage │ │ Network │ │ Monitoring │ │
│ │ Systems │ │ Services │ │ & Logging │ │
│ └──────────────┘ └──────────────┘ └────────────────┘ │
└─────────────────────────────────────────────────────────┘
Central orchestrator that coordinates all system components:
class FACTDriver:
def __init__(self, config: Config):
self.config = config
self.cache = Cache(config.cache_config)
self.query_processor = QueryProcessor()
self.tool_executor = ToolExecutor()
self.security_manager = SecurityManager()
async def process_query(self, query: str) -> str:
# 1. Security validation
self.security_manager.validate_query(query)
# 2. Cache check
cache_key = self._generate_cache_key(query)
cached_result = await self.cache.get(cache_key)
if cached_result:
return cached_result
# 3. Process query
result = await self.query_processor.process(query)
# 4. Cache result
await self.cache.set(cache_key, result)
return resultHandles natural language query processing and routing:
class QueryProcessor:
def __init__(self):
self.claude_client = AnthropicClient()
self.template_engine = TemplateEngine()
async def process(self, query: str) -> str:
# Analyze query intent
intent = await self._analyze_intent(query)
# Select appropriate template
template = self.template_engine.select_template(intent)
# Process with Claude
response = await self.claude_client.complete(
template.format_prompt(query)
)
return responseManages cognitive templates for different query types:
pub struct TemplateEngine {
templates: HashMap<String, Template>,
registry: TemplateRegistry,
}
impl TemplateEngine {
pub fn select_template(&self, query: &str) -> &Template {
let intent = self.analyze_intent(query);
self.templates.get(&intent)
.unwrap_or(self.templates.get("default").unwrap())
}
pub fn register_template(&mut self, template: Template) {
self.templates.insert(template.name.clone(), template);
}
}Multi-tier intelligent caching system:
class CacheManager:
def __init__(self, config: CacheConfig):
self.l1_cache = MemoryCache(config.memory_size) # Hot data
self.l2_cache = DiskCache(config.disk_size) # Warm data
self.l3_cache = RemoteCache(config.remote_config) # Cold data
self.stats = CacheStats()
async def get(self, key: str) -> Optional[str]:
# Try L1 (memory) first
result = self.l1_cache.get(key)
if result:
self.stats.record_hit('l1')
return result
# Try L2 (disk)
result = await self.l2_cache.get(key)
if result:
self.stats.record_hit('l2')
# Promote to L1
self.l1_cache.set(key, result)
return result
# Try L3 (remote)
result = await self.l3_cache.get(key)
if result:
self.stats.record_hit('l3')
# Promote to L2 and L1
await self.l2_cache.set(key, result)
self.l1_cache.set(key, result)
return result
self.stats.record_miss()
return Nonegraph TD
A[Client Request] --> B[Security Layer]
B --> C{Valid Request?}
C -->|No| D[Return Error]
C -->|Yes| E[Generate Cache Key]
E --> F{Cache Hit?}
F -->|Yes| G[Return Cached Result]
F -->|No| H[Process Query]
H --> I[Template Selection]
I --> J[Claude API Call]
J --> K[Process Response]
K --> L[Update Cache]
L --> M[Return Result]
style A fill:#e1f5fe
style G fill:#c8e6c9
style M fill:#c8e6c9
style D fill:#ffcdd2
graph LR
A[Query] --> B[Hash Key]
B --> C{L1 Cache}
C -->|Hit| D[Return Result]
C -->|Miss| E{L2 Cache}
E -->|Hit| F[Promote to L1]
E -->|Miss| G{L3 Cache}
G -->|Hit| H[Promote to L2 & L1]
G -->|Miss| I[Process Query]
I --> J[Store in All Levels]
F --> D
H --> D
J --> D
Query processing uses command pattern for extensibility:
class QueryCommand(ABC):
@abstractmethod
async def execute(self, context: QueryContext) -> QueryResult:
pass
class AnalysisCommand(QueryCommand):
async def execute(self, context: QueryContext) -> QueryResult:
# Implement analysis logic
pass
class AggregationCommand(QueryCommand):
async def execute(self, context: QueryContext) -> QueryResult:
# Implement aggregation logic
passTemplate selection uses strategy pattern:
class TemplateSelectionStrategy(ABC):
@abstractmethod
def select_template(self, query: str) -> str:
pass
class KeywordStrategy(TemplateSelectionStrategy):
def select_template(self, query: str) -> str:
if 'analyze' in query.lower():
return 'analysis-basic'
elif 'aggregate' in query.lower():
return 'data-aggregation'
return 'default'
class MLStrategy(TemplateSelectionStrategy):
def select_template(self, query: str) -> str:
# Use ML model for template selection
passEvent system for monitoring and logging:
class EventBus:
def __init__(self):
self.observers = defaultdict(list)
def subscribe(self, event_type: str, observer: Observer):
self.observers[event_type].append(observer)
def publish(self, event: Event):
for observer in self.observers[event.type]:
observer.handle(event)
class MetricsObserver(Observer):
def handle(self, event: Event):
if event.type == 'query_processed':
self.record_query_metric(event.data)# Simple deployment for development/small scale
single_instance:
components:
- fact_application
- sqlite_database
- local_cache
resources:
cpu: 2 cores
memory: 4GB
storage: 50GB
scalability: Limited
availability: Single point of failure# Production cluster deployment
cluster:
load_balancer:
type: nginx
instances: 2
health_checks: enabled
application_tier:
instances: 3
auto_scaling: enabled
min_instances: 2
max_instances: 10
cache_tier:
type: redis_cluster
instances: 3
replication: enabled
database_tier:
type: postgresql
primary: 1
replicas: 2
backup: enabled# Microservices deployment
microservices:
query_service:
replicas: 3
resources:
cpu: 1 core
memory: 2GB
cache_service:
replicas: 2
resources:
cpu: 0.5 core
memory: 4GB
template_service:
replicas: 2
resources:
cpu: 0.5 core
memory: 1GB
security_service:
replicas: 2
resources:
cpu: 0.5 core
memory: 1GB┌─────────────────────────────────────────┐
│ L1 Cache (Memory) │
│ Size: 1000 entries | TTL: 300s │
│ Hit Rate: 85% | Latency: <1ms │
└─────────────────┬───────────────────────┘
│
┌─────────────────▼───────────────────────┐
│ L2 Cache (Disk) │
│ Size: 10,000 entries | TTL: 3600s │
│ Hit Rate: 10% | Latency: <10ms │
└─────────────────┬───────────────────────┘
│
┌─────────────────▼───────────────────────┐
│ L3 Cache (Remote) │
│ Size: 100,000 entries | TTL: 86400s │
│ Hit Rate: 4% | Latency: <50ms │
└─────────────────────────────────────────┘
Input Query
↓
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ Validation │ → │ Parsing │ → │ Template │
│ <1ms │ │ <5ms │ │ Selection │
└─────────────┘ └──────────────┘ └──────┬──────┘
↓
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ Result │ ← │ Processing │ ← │ Cache │
│ Formatting │ │ <100ms │ │ Check │
└─────────────┘ └──────────────┘ └─────────────┘
class ConfigurationManager:
def __init__(self):
self.config_sources = [
EnvironmentConfig(), # Highest priority
FileConfig(), # Medium priority
DefaultConfig() # Lowest priority
]
def get(self, key: str) -> Any:
for source in self.config_sources:
value = source.get(key)
if value is not None:
return value
raise ConfigurationError(f"Configuration key not found: {key}")# config/base.yaml
cache:
default_ttl: 3600
max_size: 10000
security:
enable_validation: true
max_query_length: 10000
---
# config/development.yaml
cache:
max_size: 1000 # Override for dev
logging:
level: DEBUG
---
# config/production.yaml
cache:
max_size: 50000 # Override for prod
logging:
level: INFO
monitoring:
enabled: true┌─────────────────────────────────────────┐
│ Network Security │
│ • Firewall rules │
│ • TLS/SSL encryption │
│ • Rate limiting │
└─────────────────┬───────────────────────┘
│
┌─────────────────▼───────────────────────┐
│ Application Security │
│ • Input validation │
│ • Authentication │
│ • Authorization │
└─────────────────┬───────────────────────┘
│
┌─────────────────▼───────────────────────┐
│ Data Security │
│ • Encryption at rest │
│ • Secure key management │
│ • Audit logging │
└─────────────────────────────────────────┘
class LoadBalancer:
def __init__(self):
self.instances = []
self.current_index = 0
def add_instance(self, instance: FACTInstance):
self.instances.append(instance)
def get_instance(self) -> FACTInstance:
# Round-robin load balancing
instance = self.instances[self.current_index]
self.current_index = (self.current_index + 1) % len(self.instances)
return instance# Kubernetes auto-scaling configuration
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: fact-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: fact-deployment
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80monitoring:
metrics:
- prometheus
- grafana
logging:
- elasticsearch
- logstash
- kibana
tracing:
- jaeger
- opentelemetry
alerting:
- alertmanager
- pagerduty# Application metrics
QUERY_DURATION = Histogram('fact_query_duration_seconds')
CACHE_HIT_RATE = Gauge('fact_cache_hit_rate')
ACTIVE_CONNECTIONS = Gauge('fact_active_connections')
ERROR_RATE = Counter('fact_errors_total')
# System metrics
CPU_USAGE = Gauge('system_cpu_usage_percent')
MEMORY_USAGE = Gauge('system_memory_usage_bytes')
DISK_USAGE = Gauge('system_disk_usage_bytes')This architecture provides a solid foundation for building scalable, performant, and maintainable FACT applications. The modular design allows for easy extension and adaptation to different use cases and deployment scenarios.