-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathmain.py
More file actions
61 lines (45 loc) · 1.58 KB
/
main.py
File metadata and controls
61 lines (45 loc) · 1.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
"""FastAPI application entry point with multi-tenant auth."""
from contextlib import asynccontextmanager
from fastapi import Depends, FastAPI
from fastapi.middleware.cors import CORSMiddleware
from temporalio.client import Client
from .config import get_settings
from .database.session import dispose_engine, init_engine
from .dependencies import get_current_user
from .routers import workflows
from .tenants import TenantRegistry
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Initialize and tear down application resources."""
settings = get_settings()
engine = init_engine()
app.state.db_engine = engine
app.state.temporal_client = await Client.connect(settings.temporal_host)
# Load tenant registry
if not settings.auth_disabled:
app.state.tenant_registry = TenantRegistry.from_file(
settings.tenants_config_path,
)
else:
app.state.tenant_registry = TenantRegistry()
yield
dispose_engine()
app = FastAPI(lifespan=lifespan)
# Apply CORS middleware using settings
_settings = get_settings()
if _settings.allowed_origins:
app.add_middleware(
CORSMiddleware,
allow_origins=_settings.allowed_origins,
allow_methods=["GET"],
allow_headers=["*"],
)
app.include_router(workflows.router)
@app.get("/")
async def root(auth=Depends(get_current_user)):
"""Root endpoint."""
return {"message": "This is the backend service for Orcha!"}
@app.get("/health", include_in_schema=False)
async def health():
"""Health check endpoint, non-authenticated."""
return {"status": "ok"}