From fbf5068752389126f27eac8a0f914107e45a1eb3 Mon Sep 17 00:00:00 2001 From: Lina Date: Wed, 4 Feb 2026 12:57:38 +0100 Subject: [PATCH] Cursor files refinery-authorizer --- .cursor/rules/authorization.mdc | 121 +++++++++++++++++++++++++ .cursor/rules/exceptions.mdc | 122 +++++++++++++++++++++++++ .cursor/rules/fastapi-routes.mdc | 147 ++++++++++++++++++++++++++++++ .cursor/rules/guidelines.mdc | 61 +++++++++++++ .cursor/rules/telemetry.mdc | 151 +++++++++++++++++++++++++++++++ .cursor/rules/testing.mdc | 127 ++++++++++++++++++++++++++ 6 files changed, 729 insertions(+) create mode 100644 .cursor/rules/authorization.mdc create mode 100644 .cursor/rules/exceptions.mdc create mode 100644 .cursor/rules/fastapi-routes.mdc create mode 100644 .cursor/rules/guidelines.mdc create mode 100644 .cursor/rules/telemetry.mdc create mode 100644 .cursor/rules/testing.mdc diff --git a/.cursor/rules/authorization.mdc b/.cursor/rules/authorization.mdc new file mode 100644 index 0000000..89c7b31 --- /dev/null +++ b/.cursor/rules/authorization.mdc @@ -0,0 +1,121 @@ +--- +description: Rules for authorization logic and access control patterns +globs: ["main.py"] +alwaysApply: true +--- + +# Authorization Guidelines + +Authorization logic evaluates whether users have access to specific resources based on identity, roles, and configured policies. + +## Authorization Pattern + +**Basic structure:** +```python +@app.post("/authorize") +def authorize(body: dict, response: Response): + if body["resource"] == "kratos:admin": + return resolve_kratos_admin(body, response) + + response.status_code = status.HTTP_403_FORBIDDEN + return {"status": "not authorized"} +``` + +## Resource-Based Routing + +Route authorization decisions by resource type: + +```python +def authorize(body: dict, response: Response): + resource = body.get("resource") + + if resource == "kratos:admin": + return resolve_kratos_admin(body, response) + elif resource == "other:resource": + return resolve_other_resource(body, response) + + # Default: deny access + response.status_code = status.HTTP_403_FORBIDDEN + return {"status": "not authorized"} +``` + +## Authorization Decision Functions + +**Structure:** +```python +def resolve_kratos_admin(body: dict, response: Response) -> dict: + subject = body["subject"]["identity"] + + # Check authorization conditions + if is_authorized(subject): + response.status_code = status.HTTP_200_OK + return {"status": "authorized"} + + response.status_code = status.HTTP_403_FORBIDDEN + return {"status": "not authorized"} +``` + +## Authorization Checks + +**Role-based check:** +```python +# Handle None metadata_public safely +metadata = subject.get("metadata_public") or {} +if metadata.get("role") == "ADMIN": + # Authorized +``` + +**Verification check:** +```python +if subject["verifiable_addresses"][0]["verified"]: + # Email verified +``` + +## Safe Data Access + +**Handle optional fields:** +```python +# Use .get() with defaults for optional fields +metadata = subject.get("metadata_public") or {} + +# Use .get() with default for nested access +role = (subject.get("metadata_public") or {}).get("role") +``` + +**Validate required fields exist:** +```python +if "subject" not in body or "identity" not in body["subject"]: + response.status_code = status.HTTP_400_BAD_REQUEST + return {"status": "invalid request"} +``` + +## Response Patterns + +**Authorized:** +```python +response.status_code = status.HTTP_200_OK +return {"status": "authorized"} +``` + +**Forbidden:** +```python +response.status_code = status.HTTP_403_FORBIDDEN +return {"status": "not authorized"} +``` + +**Invalid Request:** +```python +response.status_code = status.HTTP_400_BAD_REQUEST +return {"status": "invalid request", "error": "missing required field"} +``` + +## Best Practices + +1. Always check all authorization conditions before granting access +2. Use safe access patterns (`.get()` with defaults) for optional fields +3. Validate required fields exist before accessing nested data +4. Return consistent response format: `{"status": "authorized"}` or `{"status": "not authorized"}` +5. Set appropriate HTTP status codes (200, 403, 400) +6. Keep authorization logic focused and testable +7. Document authorization policies clearly in code comments +8. Handle edge cases (None values, missing fields, empty arrays) diff --git a/.cursor/rules/exceptions.mdc b/.cursor/rules/exceptions.mdc new file mode 100644 index 0000000..04402bc --- /dev/null +++ b/.cursor/rules/exceptions.mdc @@ -0,0 +1,122 @@ +--- +description: Rules for exception handling and error responses +globs: ["main.py"] +alwaysApply: true +--- + +# Exceptions Guidelines + +Handle errors gracefully with appropriate HTTP status codes and clear error messages. + +## HTTP Status Codes + +**Standard status codes:** +- `200 OK`: Authorization granted +- `400 Bad Request`: Invalid request format or missing required fields +- `403 Forbidden`: Authorization denied +- `500 Internal Server Error`: Unexpected server error + +## Error Response Format + +**Consistent error structure:** +```python +# Success +{"status": "authorized"} + +# Denied +{"status": "not authorized"} + +# Error +{"status": "error", "message": "description of error"} +``` + +## Request Validation + +**Check required fields:** +```python +@app.post("/authorize") +def authorize(body: dict, response: Response): + if "resource" not in body: + response.status_code = status.HTTP_400_BAD_REQUEST + return {"status": "error", "message": "resource is required"} + + if "subject" not in body: + response.status_code = status.HTTP_400_BAD_REQUEST + return {"status": "error", "message": "subject is required"} +``` + +**Validate nested structure:** +```python +subject = body.get("subject", {}) +if "identity" not in subject: + response.status_code = status.HTTP_400_BAD_REQUEST + return {"status": "error", "message": "subject.identity is required"} +``` + +## Safe Data Access + +**Handle missing or None values:** +```python +# Use .get() with defaults +metadata = subject.get("metadata_public") or {} + +# Check array existence before indexing +if not subject.get("verifiable_addresses"): + response.status_code = status.HTTP_400_BAD_REQUEST + return {"status": "error", "message": "verifiable_addresses is required"} + +# Safe array access +addresses = subject.get("verifiable_addresses", []) +if addresses and addresses[0].get("verified"): + # Process +``` + +## Exception Handling + +**Catch and handle exceptions:** +```python +@app.post("/authorize") +def authorize(body: dict, response: Response): + try: + # Authorization logic + if body["resource"] == "kratos:admin": + return resolve_kratos_admin(body, response) + except KeyError as e: + response.status_code = status.HTTP_400_BAD_REQUEST + return {"status": "error", "message": f"Missing required field: {e}"} + except (IndexError, AttributeError) as e: + response.status_code = status.HTTP_400_BAD_REQUEST + return {"status": "error", "message": f"Invalid data structure: {e}"} + except Exception as e: + # Log the full exception for debugging + import logging + logging.error(f"Unexpected error: {e}", exc_info=True) + response.status_code = status.HTTP_500_INTERNAL_SERVER_ERROR + return {"status": "error", "message": "Internal server error"} +``` + +## Authorization Denial + +**Return 403 for authorization failures:** +```python +def resolve_kratos_admin(body: dict, response: Response): + subject = body["subject"]["identity"] + + if not is_authorized(subject): + response.status_code = status.HTTP_403_FORBIDDEN + return {"status": "not authorized"} + + response.status_code = status.HTTP_200_OK + return {"status": "authorized"} +``` + +## Best Practices + +1. Always validate request structure before processing +2. Use appropriate HTTP status codes (400 for bad requests, 403 for denied, 500 for errors) +3. Return consistent error response format +4. Use safe access patterns (`.get()` with defaults) for optional fields +5. Log unexpected exceptions with full traceback for debugging +6. Don't expose internal error details in production responses +7. Handle edge cases (None values, empty arrays, missing keys) +8. Provide clear error messages for debugging diff --git a/.cursor/rules/fastapi-routes.mdc b/.cursor/rules/fastapi-routes.mdc new file mode 100644 index 0000000..dbf279f --- /dev/null +++ b/.cursor/rules/fastapi-routes.mdc @@ -0,0 +1,147 @@ +--- +description: Rules for FastAPI route definitions and HTTP handling +globs: ["main.py"] +alwaysApply: true +--- + +# FastAPI Routes Guidelines + +Routes handle HTTP request/response logic. Keep routes simple and focused on authorization decisions. + +## Route Structure + +**Basic route:** +```python +from fastapi import FastAPI, Response, status + +app = FastAPI(title="refinery-authorizer") + +@app.post("/authorize") +def authorize(body: dict, response: Response): + # Authorization logic + pass +``` + +## Health Check Routes + +**JSON health endpoint:** +```python +@app.get("/health") +async def root(): + return {"alive": "true"} +``` + +**Plain text healthcheck (for load balancers):** +```python +from fastapi import responses + +@app.get("/healthcheck") +def healthcheck() -> responses.PlainTextResponse: + return responses.PlainTextResponse("OK") +``` + +## Request Body Handling + +**Accept dict for flexible authorization requests:** +```python +@app.post("/authorize") +def authorize(body: dict, response: Response): + resource = body.get("resource") + subject = body.get("subject", {}) + # Process authorization +``` + +**For type safety, consider Pydantic models:** +```python +from pydantic import BaseModel +from typing import Optional, Dict, Any + +class AuthorizeRequest(BaseModel): + resource: str + subject: Dict[str, Any] + +@app.post("/authorize") +def authorize(body: AuthorizeRequest, response: Response): + if body.resource == "kratos:admin": + return resolve_kratos_admin(body.dict(), response) +``` + +## Response Handling + +**Set status codes explicitly:** +```python +from fastapi import Response, status + +response.status_code = status.HTTP_200_OK +return {"status": "authorized"} + +response.status_code = status.HTTP_403_FORBIDDEN +return {"status": "not authorized"} +``` + +**Use FastAPI status constants:** +```python +status.HTTP_200_OK # Success +status.HTTP_400_BAD_REQUEST # Invalid request +status.HTTP_403_FORBIDDEN # Forbidden +status.HTTP_500_INTERNAL_SERVER_ERROR # Server error +``` + +## Error Handling + +**Handle missing fields:** +```python +@app.post("/authorize") +def authorize(body: dict, response: Response): + if "resource" not in body: + response.status_code = status.HTTP_400_BAD_REQUEST + return {"status": "error", "message": "resource is required"} + + # Continue processing +``` + +**Handle exceptions:** +```python +from fastapi import HTTPException + +@app.post("/authorize") +def authorize(body: dict, response: Response): + try: + # Authorization logic + pass + except KeyError as e: + response.status_code = status.HTTP_400_BAD_REQUEST + return {"status": "error", "message": f"Missing field: {e}"} + except Exception as e: + response.status_code = status.HTTP_500_INTERNAL_SERVER_ERROR + return {"status": "error", "message": "Internal server error"} +``` + +## Async vs Sync + +**Use async for I/O operations:** +```python +@app.get("/health") +async def root(): + # If you need async I/O + return {"alive": "true"} +``` + +**Use sync for simple logic:** +```python +@app.post("/authorize") +def authorize(body: dict, response: Response): + # Simple authorization logic doesn't need async + pass +``` + +## Best Practices + +1. Keep routes thin - delegate complex logic to separate functions +2. Always set appropriate HTTP status codes +3. Return consistent response formats +4. Validate request data before processing +5. Handle errors gracefully with appropriate status codes +6. Use type hints for all parameters +7. Document route purpose with clear function names +8. Use async only when needed (I/O operations) diff --git a/.cursor/rules/guidelines.mdc b/.cursor/rules/guidelines.mdc new file mode 100644 index 0000000..ebbc23a --- /dev/null +++ b/.cursor/rules/guidelines.mdc @@ -0,0 +1,61 @@ +--- +description: Main guidelines and architecture overview for refinery-authorizer +alwaysApply: true +--- + +# Refinery Authorizer Guidelines + +Authorization service guidelines ensuring consistency across API routes, authorization logic, telemetry, and testing. + +## Overview + +The `refinery-authorizer` is a FastAPI-based microservice that evaluates whether a user has access to certain resources. It serves as an authorization decision point, primarily for Kratos admin access, within the Refinery platform ecosystem. + +## Architecture + +The authorizer follows a simple, focused architecture: + +- **FastAPI Routes** (`main.py`): HTTP endpoint definitions for authorization decisions and health checks +- **Authorization Logic** (`main.py`): Business logic for evaluating access permissions +- **Telemetry** (`telemetry.py`): OpenTelemetry instrumentation and Prometheus metrics +- **Tests** (`tests/`): Test suite using pytest and FastAPI TestClient + +## Guideline Files + +- **[authorization.mdc](./authorization.mdc)** - Rules for authorization logic and decision patterns +- **[fastapi-routes.mdc](./fastapi-routes.mdc)** - Rules for FastAPI route definitions +- **[telemetry.mdc](./telemetry.mdc)** - Rules for observability and telemetry +- **[exceptions.mdc](./exceptions.mdc)** - Rules for exception handling +- **[testing.mdc](./testing.mdc)** - Rules for testing patterns + +## General Principles + +1. **Simplicity**: Keep the service focused on authorization decisions - avoid unnecessary complexity +2. **Type Safety**: Use type hints consistently, leverage Pydantic for request validation +3. **Security First**: Always validate inputs, handle edge cases, return appropriate HTTP status codes +4. **Observability**: Ensure all requests are instrumented for tracing and metrics +5. **Testability**: Write clear, focused tests for authorization logic +6. **Stateless**: Authorization decisions should be stateless and idempotent + +## Directory Structure + +``` +refinery-authorizer/ +├── main.py # FastAPI application and authorization routes +├── telemetry.py # OpenTelemetry and Prometheus instrumentation +├── conftest.py # Pytest configuration and fixtures +├── tests/ # Test suite +│ └── test_main.py # Route and authorization logic tests +├── requirements/ # Python dependencies +└── .cursor/rules/ # Cursor AI rules +``` + +## Quick Reference + +**Authorization**: Evaluate access based on resource type, user identity, and configured policies. Return HTTP 200 for authorized, 403 for forbidden. + +**Telemetry**: Enabled via `ENABLE_TELEMETRY` environment variable. Exports traces to OTLP endpoint and exposes Prometheus metrics at `/metrics`. + +**Health Checks**: `/health` returns JSON status, `/healthcheck` returns plain text "OK" for load balancer checks. + +**Testing**: Use `pytest` with FastAPI `TestClient` fixture. Test both authorized and unauthorized scenarios. diff --git a/.cursor/rules/telemetry.mdc b/.cursor/rules/telemetry.mdc new file mode 100644 index 0000000..9d1f4bf --- /dev/null +++ b/.cursor/rules/telemetry.mdc @@ -0,0 +1,151 @@ +--- +description: Rules for observability, telemetry, and monitoring +globs: ["telemetry.py", "main.py"] +alwaysApply: true +--- + +# Telemetry Guidelines + +Telemetry provides observability through OpenTelemetry tracing and Prometheus metrics. + +## Configuration + +**Environment variables:** +```python +import os + +OTLP_GRPC_ENDPOINT = os.getenv("OTLP_GRPC_ENDPOINT", "tempo:4317") +ENABLE_TELEMETRY = os.getenv("ENABLE_TELEMETRY", "false") == "true" +``` + +**Conditional setup:** +```python +if telemetry.ENABLE_TELEMETRY: + print("WARNING: Running telemetry.", flush=True) + telemetry.setting_app_name(app_name) + telemetry.setting_otlp(app, app_name=app_name, endpoint=OTLP_GRPC_ENDPOINT) + app.add_middleware(telemetry.PrometheusMiddleware, app_name=app_name) + app.add_route("/metrics", telemetry.metrics) +``` + +## OpenTelemetry Setup + +**Configure OTLP exporter:** +```python +from opentelemetry import trace +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor + +def setting_otlp(app: ASGIApp, app_name: str, endpoint: str): + resource = Resource.create( + attributes={"service.name": app_name, "compose_service": app_name} + ) + + tracer = TracerProvider(resource=resource) + trace.set_tracer_provider(tracer) + + tracer.add_span_processor( + BatchSpanProcessor(OTLPSpanExporter(endpoint=endpoint, insecure=True)) + ) + + FastAPIInstrumentor.instrument_app(app, tracer_provider=tracer) +``` + +## Prometheus Metrics + +**Define metrics:** +```python +from prometheus_client import Counter, Gauge, Histogram + +REQUESTS = Counter( + "fastapi_requests_total", + "Total count of requests by method and path.", + ["method", "path", "app_name"], +) + +RESPONSES = Counter( + "fastapi_responses_total", + "Total count of responses by method, path and status codes.", + ["method", "path", "status_code", "app_name"], +) + +REQUESTS_PROCESSING_TIME = Histogram( + "fastapi_requests_duration_seconds", + "Histogram of requests processing time by path (in seconds)", + ["method", "path", "app_name"], +) +``` + +## Prometheus Middleware + +**Track requests:** +```python +from starlette.middleware.base import BaseHTTPMiddleware +from starlette.requests import Request +from starlette.responses import Response + +class PrometheusMiddleware(BaseHTTPMiddleware): + async def dispatch(self, request: Request, call_next): + method = request.method + path, is_handled_path = self.get_path(request) + + if not is_handled_path: + return await call_next(request) + + REQUESTS.labels(method=method, path=path, app_name=self.app_name).inc() + before_time = time.perf_counter() + + try: + response = await call_next(request) + except BaseException as e: + EXCEPTIONS.labels(...).inc() + raise + else: + status_code = response.status_code + after_time = time.perf_counter() + REQUESTS_PROCESSING_TIME.labels(...).observe(after_time - before_time) + + RESPONSES.labels(...).inc() + return response +``` + +## Metrics Endpoint + +**Expose Prometheus metrics:** +```python +from prometheus_client.openmetrics.exposition import generate_latest, CONTENT_TYPE_LATEST +from starlette.responses import Response + +def metrics(request: Request) -> Response: + return Response( + generate_latest(REGISTRY), + headers={"Content-Type": CONTENT_TYPE_LATEST} + ) + +# Register route +app.add_route("/metrics", telemetry.metrics) +``` + +## Log Filtering + +**Filter out metrics endpoint from access logs:** +```python +import logging + +logging.getLogger("uvicorn.access").addFilter( + lambda record: "GET /metrics" not in record.getMessage() +) +``` + +## Best Practices + +1. Make telemetry opt-in via environment variable +2. Use consistent app_name across all metrics +3. Include method, path, and app_name labels in metrics +4. Track request duration, counts, and status codes +5. Filter out health check and metrics endpoints from logs +6. Use BatchSpanProcessor for efficient trace export +7. Set appropriate resource attributes for service identification +8. Handle telemetry failures gracefully (don't break the app) diff --git a/.cursor/rules/testing.mdc b/.cursor/rules/testing.mdc new file mode 100644 index 0000000..3c3ef6f --- /dev/null +++ b/.cursor/rules/testing.mdc @@ -0,0 +1,127 @@ +--- +description: Rules for testing patterns and test structure +globs: ["tests/**/*.py", "conftest.py"] +alwaysApply: true +--- + +# Testing Guidelines + +Tests ensure authorization logic works correctly for all scenarios. + +## Test Structure + +**Use pytest with FastAPI TestClient:** +```python +import pytest +from fastapi.testclient import TestClient +from typing import Iterator +from main import app + +@pytest.fixture +def client() -> Iterator[TestClient]: + with TestClient(app) as client: + yield client +``` + +## Test Patterns + +**Health check tests:** +```python +def test_healthcheck(client: TestClient): + response = client.get("/healthcheck") + assert response.status_code == 200 + assert response.text == "OK" +``` + +**Health endpoint tests:** +```python +def test_health(client: TestClient): + response = client.get("/health") + assert response.status_code == 200 + assert response.json() == {"alive": "true"} +``` + +## Authorization Tests + +**Test authorized scenarios:** +```python +def test_authorize_kratos_admin_authorized(client: TestClient): + body = { + "resource": "kratos:admin", + "subject": { + "identity": { + "traits": {"email": "user@kern.ai"}, + "verifiable_addresses": [{"verified": True}] + } + } + } + response = client.post("/authorize", json=body) + assert response.status_code == 200 + assert response.json() == {"status": "authorized"} +``` + +**Test unauthorized scenarios:** +```python +def test_authorize_kratos_admin_unauthorized(client: TestClient): + body = { + "resource": "kratos:admin", + "subject": { + "identity": { + "traits": {"email": "user@example.com"}, + "verifiable_addresses": [{"verified": True}] + } + } + } + response = client.post("/authorize", json=body) + assert response.status_code == 403 + assert response.json() == {"status": "not authorized"} +``` + +**Test invalid requests:** +```python +def test_authorize_missing_resource(client: TestClient): + body = { + "subject": { + "identity": { + "traits": {"email": "user@kern.ai"}, + "verifiable_addresses": [{"verified": True}] + } + } + } + response = client.post("/authorize", json=body) + assert response.status_code == 400 +``` + +## Test Coverage + +**Test all authorization paths:** +- Authorized scenarios (email domain match, role match) +- Unauthorized scenarios (wrong domain, no role, unverified email) +- Invalid requests (missing fields, wrong structure) +- Edge cases (None values, empty arrays, missing nested fields) + +## Test Organization + +**Group related tests:** +```python +class TestKratosAdminAuthorization: + def test_email_domain_match(self, client: TestClient): + pass + + def test_role_match(self, client: TestClient): + pass + + def test_unverified_email(self, client: TestClient): + pass +``` + +## Best Practices + +1. Test both success and failure paths +2. Test edge cases (None, empty arrays, missing fields) +3. Use descriptive test names that explain what is being tested +4. Keep tests focused - one assertion per test concept +5. Use fixtures for common test data +6. Test authorization logic thoroughly - security is critical +7. Test error handling and invalid inputs +8. Use TestClient for integration-style tests