Content-based recommender with FastAPI: item vectors (sentence-transformers) and two user vectors (short- and long-term) updated incrementally from a Redis Stream and a worker. At read time, vectors are blended with a recency alpha, L2-normalized, and ranked by cosine similarity against the catalog.
Pet project for end-to-end system design (not production-hardened).
| Piece | Role |
|---|---|
| API | Validates events, optional durable row in PostgreSQL, XADD to the stream; serves recommendations and debug endpoints. Does not update vectors on the event request path. |
| Worker | XREADGROUP on the stream, incremental short_term_vector / long_term_vector updates, XACK, periodic snapshots to PostgreSQL. |
| Redis | user:events:stream and recommend:user_state:{user_id} (JSON: vectors + last_event_ts). |
| PostgreSQL | Durable events and user_vector_snapshots for recovery (enable with POSTGRES_ENABLED=1, e.g. Docker Compose). |
- Client
POST /api/events - API may insert a durable row in PostgreSQL if
POSTGRES_ENABLED, thenXADDto the Redis stream, returns 200 - Worker
XREADGROUPfrom the stream, loads item embedding, applies short/long-term deltas, writesrecommend:user_statein Redis - Worker snapshots to PostgreSQL every N events, then
XACK
Consistency: eventual. Recommendations may lag until the worker processes events; new users can see zero vectors until the first event is applied.
GET /api/recommendations/{user_id}loads user state from Redisalpha = recency_alpha(last_event_ts, now)raw = alpha * short_term + (1 - alpha) * long_term, then L2 normalize- Cosine similarity against every item embedding, sort by score
For search events, the worker expands intent with top-K similar catalog items (SEARCH_* env vars).
| Environment | Command |
|---|---|
| Docker | docker compose up --build then open http://localhost:8000/docs |
| Local | pip install -r requirements.txt then uvicorn app.main:app --reload and python -m app.workers.recommendation_worker |
Needs Python 3.11+, Redis; Docker stack includes PostgreSQL. First run may download the embedding model.
Demo seed: if Redis lacks recommend:demo_seed_done, the API seeds demo-accion, demo-ml, demo-nature via app/seed/seed_examples.py. To reset: delete that key and restart.
| Method | Path | Description |
|---|---|---|
| POST | /events |
user_id, item_id, event_type, timestamp; search may include query |
| GET | /recommendations/{user_id} |
Ranked items (cosine + names) |
| GET | /users |
Users seen on the stream |
| GET | /users/{user_id}/events |
Event history (debug) |
Catalog item ids: item-1 ... item-16 (app/storage/catalog.py). Schema: db/schema.sql.
| Variable | Purpose |
|---|---|
REDIS_* |
Redis connection (redis host under Compose) |
POSTGRES_*, POSTGRES_ENABLED |
Durable SQL path |
SNAPSHOT_EVERY_EVENTS, RECOVERY_SNAPSHOT_LIMIT |
Snapshot cadence and recovery cap |
RECENCY_ALPHA_* |
Short/long blend at recommendation time |
EMBEDDING_*_DECAY_* |
Time decay per branch in the worker |
SEARCH_TOP_K, SEARCH_QUERY_ALPHA, SEARCH_ITEM_BETA |
Search event behavior |
Ops hint: if Redis user state is lost with PostgreSQL enabled, restart the worker to reload snapshots and continue consuming the stream; watch consumer lag with XINFO GROUPS.
- Learned user updates — Replace or augment hand-tuned decay/weights with a trained module (sequence models, two-tower update head, contrastive learning) while keeping the same stream/worker shape.
- ANN retrieval — Today is full-catalog cosine scan; add FAISS / HNSW / similar for large catalogs and lower latency.
- Production hygiene — Structured metrics, lag alerts, rate limits, event schema versioning.
FastAPI, Redis (Streams + JSON state), sentence-transformers, NumPy, Docker Compose, PostgreSQL (events + snapshots).