Skip to content

Commit 8cbe765

Browse files
committed
Added db persistence to event ingestion
1 parent 8622ebd commit 8cbe765

File tree

5 files changed

+61
-16
lines changed

5 files changed

+61
-16
lines changed

backend/app/api/routes/events.py

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,38 @@
11
from fastapi import APIRouter, status
22
from app.models.event import EventIn
3+
from app.db import database
34
from app.core.logging import get_logger
5+
import json
46

57
router = APIRouter(prefix="/events", tags=["events"])
68
logger = get_logger("eventrelay.events")
79

810
@router.post("", status_code=status.HTTP_202_ACCEPTED)
911
async def ingest_event(event: EventIn):
10-
logger.info(
11-
"event_recieved",
12-
extra={
13-
"extra": {
14-
"source": event.source,
15-
"type": event.type,
16-
"timestamp": event.timestamp.isoformat(),
17-
}
18-
},
19-
)
12+
query = """
13+
INSERT INTO events(source, type, payload)
14+
VALUES (:source, :type, :payload)
15+
RETURNING id;
16+
"""
17+
# Convert payload dict to JSON string for JSONB column
18+
values = event.dict()
19+
values['payload'] = json.dumps(values['payload'])
20+
21+
event_id = await database.execute(query = query, values=values)
22+
23+
logger.info(f"Stored event id={event_id} source={event.source} type={event.type}")
2024

2125
return {
2226
"status": "accepted",
23-
"event_type": event.type,
24-
}
27+
"id": event_id,
28+
}
29+
30+
@router.get("")
31+
async def get_all_events():
32+
query = "SELECT * FROM events"
33+
34+
events = await database.fetch_all(query = query)
35+
36+
logger.info(f"Retrieved all events")
37+
38+
return events

backend/app/core/config.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
from dotenv import load_dotenv
2+
import os
3+
from pathlib import Path
4+
5+
BASE_DIR = Path(__file__).resolve().parent.parent.parent
6+
load_dotenv(BASE_DIR / ".env")
7+
8+
DB_USER = os.getenv("POSTGRES_USER")
9+
DB_PASSWORD = os.getenv("POSTGRES_PASSWORD")
10+
DB_NAME = os.getenv("POSTGRES_DB", "eventrelay")
11+
DB_HOST = os.getenv("POSTGRES_HOST", "LOCALHOST")
12+
DB_PORT = os.getenv("POSTGRES_PORT", 5432)
13+
14+
DATABASE_URL = f"postgresql+asyncpg://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"

backend/app/db.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from databases import Database
2+
from app.core.config import DATABASE_URL
3+
4+
database = Database(DATABASE_URL)
5+
6+
async def connect_db():
7+
await database.connect()
8+
9+
async def disconnect_db():
10+
await database.disconnect()

backend/app/main.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from fastapi import FastAPI
22
from app.api.routes import events
3+
from app.db import connect_db, disconnect_db
4+
35

46
app = FastAPI(title="EventRelay")
57

@@ -8,3 +10,11 @@
810
@app.get("/health")
911
def health():
1012
return {"status": "ok"}
13+
14+
@app.on_event("startup")
15+
async def startup():
16+
await connect_db()
17+
18+
@app.on_event("shutdown")
19+
async def shutdown():
20+
await disconnect_db()

backend/app/models/event.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,4 @@
66
class EventIn(BaseModel):
77
source: str = Field(..., example="auth-service")
88
type: str = Field(..., example="user.login")
9-
payload: Dict = Field(..., example={"user_id": 123})
10-
timestamp: Optional[datetime] = Field(
11-
default_factory=datetime.utcnow
12-
)
9+
payload: Dict = Field(..., example={"user_id": 123})

0 commit comments

Comments
 (0)