Skip to content

Commit 0c373bd

Browse files
committed
implemented rate limit. add observability
1 parent e3e1fd2 commit 0c373bd

21 files changed

+1606
-258
lines changed

Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ EXPOSE 8000
4343

4444
# Health check
4545
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
46-
CMD curl -f http://localhost:8000/ || exit 1
46+
CMD curl -f http://localhost:8000/health || exit 1
4747

4848
# Run the application
49-
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--proxy-headers"]
49+
CMD ["python", "-m", "app.server"]

README.md

Lines changed: 93 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ The service is built with **FastAPI** and features automatic route discovery bas
1313
- **Routing:** Dynamic — endpoints are auto-generated from the dbt `manifest.json`
1414
- **Documentation:** OpenAPI (Swagger UI) & ReDoc (auto-generated)
1515
- **Security:** Header-based API Key authentication (`X-API-Key`)
16-
- **Rate Limiting:** In-memory throttling per tier (Free/Pro/Unlimited) using `slowapi`
16+
- **Rate Limiting:** Tier-based per-pod in-memory throttling using `slowapi`
17+
- **Observability:** Structured JSON logging (stderr), Prometheus metrics (`/metrics`), Grafana dashboard
1718

1819
---
1920

@@ -23,17 +24,29 @@ The service is built with **FastAPI** and features automatic route discovery bas
2324
/cerebro-api
2425
├── Dockerfile # Multi-stage Docker build definition
2526
├── requirements.txt # Python dependencies
27+
├── requirements-dev.txt # Dev/test dependencies (pytest, httpx)
2628
├── .env.example # Template for environment variables
2729
├── api_keys.json # API keys configuration (git-ignored)
2830
├── .gitignore # Git ignore rules
29-
└── app
30-
├── main.py # App entry point
31-
├── config.py # Settings & Env var loading
32-
├── database.py # ClickHouse client wrapper
33-
├── security.py # Auth & Rate limiting logic
34-
├── manifest.py # Logic to download & parse dbt manifest
35-
└── factory.py # ⚙️ The Engine: auto-generates routes
36-
````
31+
├── app/
32+
│ ├── server.py # Process entrypoint (setup logging, start uvicorn)
33+
│ ├── main.py # FastAPI app, middleware, system endpoints
34+
│ ├── observability.py # JSON logging, Prometheus metrics, middleware
35+
│ ├── config.py # Settings & env var loading
36+
│ ├── database.py # ClickHouse client wrapper
37+
│ ├── security.py # Auth resolution, access enforcement, rate limiting
38+
│ ├── manifest.py # dbt manifest loader with structured logging
39+
│ ├── router_manager.py # Dynamic route lifecycle & background refresh
40+
│ ├── factory.py # Dynamic route generation engine
41+
│ └── api_metadata.py # Endpoint spec parsing & validation
42+
└── tests/
43+
├── conftest.py # Shared fixtures (mocked DB, manifest, API keys)
44+
├── test_endpoints.py # /, /health, /metrics tests
45+
├── test_auth.py # Auth resolution & tier access tests
46+
├── test_observability.py # JSON formatter, log_event, middleware tests
47+
├── test_rate_limiting.py # Rate limit enforcement & metrics tests
48+
└── test_manifest.py # Manifest refresh & router_manager tests
49+
```
3750

3851
---
3952

@@ -117,15 +130,30 @@ Create an `api_keys.json` file in your project root:
117130
### 5. Run the Server
118131

119132
```bash
120-
uvicorn app.main:app --reload
133+
python -m app.server
134+
```
135+
136+
For development with auto-reload:
137+
138+
```bash
139+
uvicorn app.main:app --reload --proxy-headers
121140
```
122141

123142
The API will be available at:
124143

125144
* Root: `http://127.0.0.1:8000`
145+
* Health check: `http://127.0.0.1:8000/health`
146+
* Prometheus metrics: `http://127.0.0.1:8000/metrics`
126147
* Interactive Docs (Swagger UI): `http://127.0.0.1:8000/docs`
127148
* Alternative Docs (ReDoc): `http://127.0.0.1:8000/redoc`
128149

150+
### 6. Run Tests
151+
152+
```bash
153+
pip install -r requirements-dev.txt
154+
./venv/bin/pytest tests/
155+
```
156+
129157
---
130158

131159
## API Authentication & Access Tiers
@@ -424,6 +452,54 @@ Create separate models for different time granularities:
424452

425453
---
426454

455+
## Observability
456+
457+
The API emits structured JSON logs to stderr and exposes Prometheus metrics at `/metrics`.
458+
459+
### Structured Logging
460+
461+
All log output is JSON, one object per line, with fields: `timestamp`, `level`, `logger`, `message`, `event`, plus context-specific fields. Logs never contain raw API keys, SQL text, query parameters, or request bodies.
462+
463+
Key log events:
464+
465+
| Event | Description |
466+
|-------|-------------|
467+
| `http_request` | Every HTTP request (method, route, status, duration) |
468+
| `api_access_denied` | Auth failures (reason, required/provided tier) |
469+
| `api_rate_limit` | Rate-limit blocks (tier, identity kind) |
470+
| `clickhouse_query` | ClickHouse queries (category, resource, granularity, tier, row count, duration) |
471+
| `manifest_refresh` | Manifest reload lifecycle (trigger, status, model count) |
472+
| `route_install` | Dynamic route registration (path, model, tier, methods) |
473+
474+
### Prometheus Metrics
475+
476+
All metrics are prefixed with `cerebro_api_`. Key metric families:
477+
478+
| Metric | Type | Labels |
479+
|--------|------|--------|
480+
| `http_requests_total` | Counter | method, route, status |
481+
| `http_request_duration_seconds` | Histogram | method, route |
482+
| `auth_resolutions_total` | Counter | required_tier, result |
483+
| `access_denied_total` | Counter | required_tier, provided_tier, reason |
484+
| `rate_limit_decisions_total` | Counter | tier, result, identity_kind |
485+
| `dynamic_requests_total` | Counter | category, resource, granularity, tier, method, status |
486+
| `dynamic_request_duration_seconds` | Histogram | category, resource, granularity, tier, method |
487+
| `clickhouse_query_duration_seconds` | Histogram | category, resource, granularity, tier, status |
488+
| `clickhouse_query_errors_total` | Counter | category, resource, granularity, tier |
489+
| `clickhouse_rows_returned` | Histogram | category, resource, granularity, tier |
490+
| `manifest_refresh_total` | Counter | trigger, status |
491+
| `manifest_models_loaded` | Gauge ||
492+
| `dynamic_routes_registered` | Gauge ||
493+
494+
### Kubernetes Integration
495+
496+
- **PodMonitor** scrapes `/metrics` on port `http` — Prometheus discovers pods directly
497+
- `/metrics` is blocked at the public ALB with a fixed-response 403 rule
498+
- K8s probes use `/health` (returns 200 when ClickHouse is reachable, 503 otherwise)
499+
- A dedicated Grafana dashboard (`cerebro-api-observability`) provides real-time visibility across traffic, auth, rate limits, dynamic API, ClickHouse, manifest lifecycle, pod resources, and structured logs
500+
501+
---
502+
427503
## Deployment (Docker)
428504

429505
This service is designed to run as a **stateless container** on Kubernetes.
@@ -532,13 +608,16 @@ curl -X POST http://localhost:8000/v1/system/manifest/refresh \
532608

533609
| File | Purpose |
534610
|------|---------|
535-
| `app/main.py` | FastAPI app initialization |
611+
| `app/server.py` | Process entrypoint — sets up logging before app import |
612+
| `app/main.py` | FastAPI app, CORS, middleware, system endpoints (`/`, `/health`, `/metrics`) |
613+
| `app/observability.py` | JSON log formatter, Prometheus metrics, HTTP middleware, observer helpers |
536614
| `app/config.py` | Settings & environment loading |
537615
| `app/database.py` | ClickHouse client wrapper |
538616
| `app/api_metadata.py` | dbt endpoint metadata parsing and validation |
539-
| `app/security.py` | Authentication & tier access control |
540-
| `app/manifest.py` | dbt manifest loader |
541-
| `app/factory.py` | Dynamic route generation engine |
617+
| `app/security.py` | Auth resolution, access enforcement, rate-limit helpers |
618+
| `app/manifest.py` | dbt manifest loader with structured logging and model gauge |
619+
| `app/router_manager.py` | Dynamic route lifecycle, background refresh, manifest refresh metrics |
620+
| `app/factory.py` | Dynamic route generation engine with instrumentation |
542621

543622
### Adding Custom Endpoints
544623

app/api_metadata.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ class ApiEndpointSpec:
5454
sort: List[ApiSortSpec]
5555
description: str
5656
metadata_enabled: bool = False
57+
category: str = "general"
58+
resource: str = "unknown"
59+
granularity: str = "none"
5760

5861

5962
@dataclass(frozen=True)

app/config.py

Lines changed: 24 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,22 @@
1-
import os
21
import json
3-
from typing import Dict, List, Optional, Any
4-
from pydantic_settings import BaseSettings
2+
import logging
3+
import os
4+
from typing import Any, Dict, Optional
5+
56
from pydantic import field_validator
7+
from pydantic_settings import BaseSettings
8+
9+
logger = logging.getLogger("cerebro_api.config")
610

711

812
def load_api_keys_from_file(filepath: str) -> Dict[str, Any]:
913
"""Load API keys from a JSON file if it exists."""
1014
if os.path.exists(filepath):
1115
try:
12-
with open(filepath, 'r') as f:
16+
with open(filepath, "r") as f:
1317
return json.load(f)
1418
except Exception as e:
15-
print(f"⚠️ Error loading API keys from {filepath}: {e}")
19+
logger.warning("Error loading API keys from %s: %s", filepath, e)
1620
return {}
1721

1822

@@ -28,15 +32,12 @@ class Settings(BaseSettings):
2832
API_CONFIG_PATH: str = "./api_config.yaml"
2933
DBT_MANIFEST_REFRESH_ENABLED: bool = True
3034
DBT_MANIFEST_REFRESH_INTERVAL_SECONDS: int = 300
31-
35+
3236
# API Keys file path (JSON file with user keys)
3337
API_KEYS_FILE: str = "./api_keys.json"
3438

3539
# ClickHouse
36-
# Option 1: Use URL (for ClickHouse Cloud)
37-
CLICKHOUSE_URL: Optional[str] = None # e.g., "ujt1j3jrk0.eu-central-1.aws.clickhouse.cloud"
38-
39-
# Option 2: Individual settings (URL takes precedence if provided)
40+
CLICKHOUSE_URL: Optional[str] = None
4041
CLICKHOUSE_HOST: str = "localhost"
4142
CLICKHOUSE_PORT: int = 8443
4243
CLICKHOUSE_USER: str = "default"
@@ -45,75 +46,58 @@ class Settings(BaseSettings):
4546
CLICKHOUSE_SECURE: bool = True
4647

4748
# Security: API Keys mapped to user info
48-
# Can be set via env var OR loaded from API_KEYS_FILE
49-
# Format: {
50-
# "sk_live_abc123": {"user": "alice", "tier": "tier0", "org": "Acme Inc"},
51-
# "sk_live_xyz789": {"user": "bob", "tier": "tier2", "org": "Partner Co"},
52-
# }
5349
API_KEYS: Dict[str, Any] = {}
5450

55-
@field_validator('API_KEYS', mode='before')
51+
@field_validator("API_KEYS", mode="before")
5652
@classmethod
5753
def normalize_api_keys(cls, v):
58-
"""
59-
Normalize API keys to full user format.
60-
Supports both simple format (key -> tier) and full format (key -> {user, tier, org}).
61-
"""
6254
if not isinstance(v, dict):
6355
return {}
64-
56+
6557
normalized = {}
6658
for key, value in v.items():
6759
if isinstance(value, str):
68-
# Simple format: "sk_key": "tier0" -> convert to full format
6960
normalized[key] = {
7061
"user": "anonymous",
7162
"tier": value,
72-
"org": None
63+
"org": None,
7364
}
7465
elif isinstance(value, dict):
75-
# Full format: ensure required fields exist
7666
normalized[key] = {
7767
"user": value.get("user", "anonymous"),
7868
"tier": value.get("tier", "tier0"),
79-
"org": value.get("org")
69+
"org": value.get("org"),
8070
}
8171
else:
82-
# Skip invalid entries
8372
continue
84-
73+
8574
return normalized
8675

87-
# Default tier for endpoints without a tier tag (for testing, set to tier0)
8876
DEFAULT_ENDPOINT_TIER: str = "tier0"
8977

90-
# Tier definitions with rate limits (requests per minute)
9178
TIER_RATE_LIMITS: Dict[str, int] = {
92-
"tier0": 20, # Public/Free tier
93-
"tier1": 100, # Partner tier
94-
"tier2": 500, # Premium tier
95-
"tier3": 10000, # Internal/Admin tier
79+
"tier0": 20,
80+
"tier1": 100,
81+
"tier2": 500,
82+
"tier3": 10000,
9683
}
9784

9885
class Config:
9986
env_file = ".env"
10087
case_sensitive = True
101-
extra = "ignore" # Allow extra env vars without raising errors
88+
extra = "ignore"
10289

10390
def __init__(self, **kwargs):
10491
super().__init__(**kwargs)
105-
# Debug: show if API_KEYS came from env
10692
if self.API_KEYS:
107-
print(f"✅ Loaded {len(self.API_KEYS)} API keys from environment variable")
108-
# Load API keys from file if env var is empty and file exists
93+
logger.info("Loaded %d API keys from environment variable", len(self.API_KEYS))
10994
elif self.API_KEYS_FILE:
11095
file_keys = load_api_keys_from_file(self.API_KEYS_FILE)
11196
if file_keys:
112-
# Normalize the loaded keys
11397
self.API_KEYS = self.normalize_api_keys(file_keys)
114-
print(f"✅ Loaded {len(self.API_KEYS)} API keys from {self.API_KEYS_FILE}")
98+
logger.info("Loaded %d API keys from %s", len(self.API_KEYS), self.API_KEYS_FILE)
11599
else:
116-
print(f"⚠️ No API keys found. Create {self.API_KEYS_FILE} or set API_KEYS env var.")
100+
logger.warning("No API keys found. Create %s or set API_KEYS env var.", self.API_KEYS_FILE)
117101

118102

119103
settings = Settings()

app/database.py

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,36 @@
1+
import logging
2+
from typing import Any, Dict, List
3+
14
import clickhouse_connect
5+
26
from app.config import settings
3-
from typing import Any, List, Dict
7+
8+
logger = logging.getLogger("cerebro_api.database")
9+
410

511
class ClickHouseClient:
612
_client = None
713

814
@classmethod
915
def get_client(cls):
10-
"""
11-
Returns a singleton ClickHouse client instance.
12-
"""
1316
if cls._client is None:
1417
cls._client = clickhouse_connect.get_client(
1518
host=settings.CLICKHOUSE_URL,
1619
port=settings.CLICKHOUSE_PORT,
1720
username=settings.CLICKHOUSE_USER,
1821
password=settings.CLICKHOUSE_PASSWORD,
1922
database=settings.CLICKHOUSE_DATABASE,
20-
secure=settings.CLICKHOUSE_SECURE
23+
secure=settings.CLICKHOUSE_SECURE,
2124
)
2225
return cls._client
2326

2427
@classmethod
2528
def query(cls, query_str: str, parameters: Dict[str, Any] = None) -> List[Dict]:
26-
"""
27-
Executes a parameterized query and returns a list of dictionaries.
28-
"""
2929
client = cls.get_client()
3030
try:
3131
result = client.query(query_str, parameters=parameters)
3232
columns = result.column_names
3333
return [dict(zip(columns, row)) for row in result.result_rows]
3434
except Exception as e:
35-
# In a real app, you might want to log this to Sentry/Datadog
36-
print(f"DB Error: {e}")
37-
raise e
35+
logger.error("ClickHouse query error: %s", e)
36+
raise e

0 commit comments

Comments
 (0)