forked from pewdiepie-archdaemon/odysseus
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp.py
More file actions
1071 lines (928 loc) · 44.5 KB
/
Copy pathapp.py
File metadata and controls
1071 lines (928 loc) · 44.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# app.py — slim orchestrator
import mimetypes
import os
def register_static_mime_types() -> None:
"""Force stable JS module MIME types across platforms.
Some native Windows setups inherit stale/incorrect registry mappings for
``.js``/``.mjs``, which can make Starlette serve ES modules with a non-JS
``Content-Type`` and cause the UI to load but fail on click. Re-register the
standard MIME types at startup so static assets are served consistently.
"""
mimetypes.add_type("text/javascript", ".js")
mimetypes.add_type("application/javascript", ".mjs")
register_static_mime_types()
# Windows: force HuggingFace/fastembed to COPY model files instead of symlinking.
# On a network-share/UNC data dir Windows can't follow HF's symlinks ([WinError
# 1463]), so the ONNX embedding model fails to load. huggingface_hub reads this
# at import time, so set it before anything pulls it in. (Mirrored in
# src/embeddings.py for non-server entrypoints.)
if os.name == "nt":
os.environ.setdefault("HF_HUB_DISABLE_SYMLINKS", "1")
os.environ.setdefault("HF_HUB_DISABLE_SYMLINKS_WARNING", "1")
from dotenv import load_dotenv
# encoding="utf-8-sig" tolerates a UTF-8 BOM in .env — a common Windows gotcha
# when the file is saved from Notepad. Without this, the first key parses as
# "AUTH_ENABLED" instead of "AUTH_ENABLED", so AUTH_ENABLED=false (etc.)
# is silently ignored and the user is unexpectedly forced to log in (issue #142).
# utf-8-sig reads plain UTF-8 (no BOM) identically, so this is safe everywhere.
load_dotenv(encoding="utf-8-sig")
import uuid
import asyncio
import logging
import secrets
from datetime import datetime
from typing import Dict
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse, FileResponse, HTMLResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from starlette.middleware.base import BaseHTTPMiddleware
# Core imports
from core.constants import (
BASE_DIR, STATIC_DIR, SESSIONS_FILE,
REQUEST_TIMEOUT, OPENAI_API_KEY,
)
from core.database import SessionLocal, ApiToken
from core.middleware import SecurityHeadersMiddleware
from core.auth import AuthManager
from core.exceptions import (
SessionNotFoundError, InvalidFileUploadError,
LLMServiceError, WebSearchError,
)
import bcrypt as _bcrypt
from src.app_helpers import abs_join
from starlette.responses import RedirectResponse
# ========= LOGGING =========
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
)
logger = logging.getLogger(__name__)
# ========= APP =========
# Lifespan is defined below (after all helpers it references are in scope)
# and passed to FastAPI so we can use the modern context-manager lifecycle
# instead of the deprecated @app.on_event("startup"/"shutdown") decorators.
app = FastAPI(
title="AI Chat Application",
description="Comprehensive AI chat with memory, research, and multi-modal capabilities",
version="1.0.0",
)
# ========= CORS =========
allowed_origins = os.getenv("ALLOWED_ORIGINS", "http://localhost,http://127.0.0.1").split(",")
app.add_middleware(
CORSMiddleware,
allow_origins=allowed_origins,
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=[
"Accept",
"Authorization",
"Content-Type",
"X-API-Key",
"X-Auth-Token",
"X-Odysseus-Internal-Token",
"X-Odysseus-Owner",
"X-Requested-With",
"X-TZ-Offset",
],
)
# ========= SECURITY HEADERS MIDDLEWARE =========
app.add_middleware(SecurityHeadersMiddleware)
# ========= REQUEST TIMEOUT (FALLBACK FOR HUNG HANDLERS) =========
# If a single request takes longer than REQUEST_HARD_TIMEOUT, abort it and
# return 504 instead of holding the event loop hostage. Whitelisted paths
# (streaming, long-running shell exec, research) are exempt because they
# legitimately stay open. Without this, a single hung subprocess.run or
# missing-timeout httpx call locks up the entire server for everyone.
import asyncio as _asyncio
from starlette.middleware.base import BaseHTTPMiddleware as _BaseHTTPMiddleware
from starlette.responses import JSONResponse as _JSONResponse
REQUEST_HARD_TIMEOUT = float(os.getenv("REQUEST_HARD_TIMEOUT", "45"))
_TIMEOUT_EXEMPT_PREFIXES = (
"/api/chat", # streaming
"/api/shell/stream", # SSE
"/api/research", # multi-minute jobs
"/api/model/download", # tmux setup may run pip installs
"/api/model/probe", # SSE; iterates models with up to 8s timeout each
"/api/model-endpoints", # /probe sub-route also iterates models
"/api/cookbook/setup", # remote pacman/apt installs
"/api/upload", # large files
"/api/image", # diffusion proxies (inpaint/harmonize/upscale/etc.) — own 120s httpx timeout
)
class _RequestTimeoutMiddleware(_BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
path = request.url.path or ""
if any(path.startswith(p) for p in _TIMEOUT_EXEMPT_PREFIXES):
return await call_next(request)
try:
return await _asyncio.wait_for(call_next(request), timeout=REQUEST_HARD_TIMEOUT)
except _asyncio.TimeoutError:
return _JSONResponse(
{"detail": f"Request exceeded {REQUEST_HARD_TIMEOUT:.0f}s timeout"},
status_code=504,
)
app.add_middleware(_RequestTimeoutMiddleware)
# ========= AUTH =========
from routes.auth_routes import setup_auth_routes, SESSION_COOKIE
auth_manager = AuthManager()
app.state.auth_manager = auth_manager
AUTH_ENABLED = os.getenv("AUTH_ENABLED", "true").lower() != "false"
LOCALHOST_BYPASS = os.getenv("LOCALHOST_BYPASS", "false").lower() == "true"
if LOCALHOST_BYPASS:
logger.warning("LOCALHOST_BYPASS is enabled, loopback requests bypass authentication. Do not expose this instance to a network.")
if AUTH_ENABLED:
AUTH_EXEMPT_EXACT = {
"/api/auth/setup",
"/api/auth/signup",
"/api/auth/login",
"/api/auth/logout",
"/api/auth/status",
"/api/auth/features",
"/api/auth/settings",
"/api/auth/integrations/presets",
"/api/health",
"/api/version",
"/login",
}
AUTH_EXEMPT_PREFIXES = ["/static"]
# Dynamic paths whose own handler proves identity via a path-embedded
# secret instead of the session/bearer auth. The route handler at
# routes/task_routes.py validates the per-task `webhook_token` itself
# and returns 404 on mismatch, so the path is the credential — the
# UI labels these URLs "no auth needed" precisely because external
# callers (Zapier, n8n, curl) can't supply a session cookie. Without
# this exemption AuthMiddleware rejects every POST with 401 before
# the token is ever checked.
import re as _re
AUTH_EXEMPT_PATTERNS = [
_re.compile(r"^/api/tasks/[^/]+/webhook/[^/]+/?$"),
]
def _is_auth_exempt(path: str) -> bool:
if path in AUTH_EXEMPT_EXACT:
return True
if any(path.startswith(p) for p in AUTH_EXEMPT_PREFIXES):
return True
return any(p.match(path) for p in AUTH_EXEMPT_PATTERNS)
# In-memory token cache: prefix → list[(token_id, token_hash, owner, scopes)]. The DB
# query was running on every API-bearer request and scanning bcrypt
# checks linearly. With this cache, we hit the DB only when the cache
# version bumps (token created/revoked) — see _token_cache_invalidate
# in app.state, called by routes/api_token_routes.
_token_cache: dict = {}
_token_cache_lock = _asyncio.Lock()
_token_cache_dirty = True
def _token_cache_invalidate():
nonlocal_dict = app.state.__dict__
nonlocal_dict["_token_cache_dirty"] = True
app.state.invalidate_token_cache = _token_cache_invalidate
app.state._token_cache = _token_cache
app.state._token_cache_dirty = True
def _refresh_token_cache():
"""Rebuild the prefix→[(id,hash)] map from the DB."""
from collections import defaultdict
new_map = defaultdict(list)
db = SessionLocal()
try:
rows = db.query(ApiToken).filter(ApiToken.is_active == True).all()
for r in rows:
scopes = [s.strip() for s in (getattr(r, "scopes", "") or "chat").split(",") if s.strip()]
new_map[r.token_prefix].append((r.id, r.token_hash, getattr(r, "owner", None), scopes))
finally:
db.close()
_token_cache.clear()
_token_cache.update(new_map)
app.state._token_cache_dirty = False
# Headers that prove a request was forwarded by a proxy/tunnel (cloudflared,
# nginx, Caddy, Tailscale Funnel, …). cloudflared connects to the app FROM
# 127.0.0.1, so without this check every tunneled request would look like
# loopback and could bypass auth.
_PROXY_FWD_HEADERS = (
"cf-connecting-ip", "cf-ray", "cf-visitor",
"x-forwarded-for", "x-forwarded-host", "x-real-ip", "forwarded",
)
def _is_trusted_loopback(request: Request) -> bool:
"""True ONLY for a DIRECT loopback connection with no proxy/tunnel
forwarding headers. A bare ``client.host in ('127.0.0.1','::1')`` check is
unsafe behind a Cloudflare tunnel / reverse proxy: those connect from
loopback, so a remote visitor would otherwise inherit local trust and
slip past LOCALHOST_BYPASS or spoof the internal-tool path. Odysseus's own
in-process agent loopback calls carry none of these headers, so they still
qualify."""
host = request.client.host if request.client else None
if host not in ("127.0.0.1", "::1"):
return False
for _h in _PROXY_FWD_HEADERS:
if request.headers.get(_h):
return False
return True
class AuthMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
path = request.url.path
if _is_auth_exempt(path):
return await call_next(request)
# In-process internal-tool token bypass. Used by the agent
# tool layer when it HTTP-loopbacks to admin-gated routes
# (no admin cookie available in that context). Restricted to
# loopback clients + matching token to keep it locked down.
try:
from core.middleware import INTERNAL_TOOL_HEADER, INTERNAL_TOOL_TOKEN as _ITT
_hdr = request.headers.get(INTERNAL_TOOL_HEADER)
if _hdr and secrets.compare_digest(_hdr, _ITT) and _is_trusted_loopback(request):
# Impersonation: when the agent's loopback call sets
# X-Odysseus-Owner, attribute the request to that user only
# if they exist. Authorization checks remain separate; this
# is just owner attribution for notes/calendar/etc.
_impersonate = (request.headers.get("X-Odysseus-Owner") or "").strip()
_auth_mgr = getattr(request.app.state, "auth_manager", None) or auth_manager
if _impersonate and _impersonate in getattr(_auth_mgr, "users", {}):
request.state.current_user = _impersonate
else:
request.state.current_user = "internal-tool"
request.state.api_token = False
return await call_next(request)
except Exception:
pass
# Allow DIRECT localhost requests (internal service calls from
# heartbeats etc.). Tunnel/proxy-forwarded requests are excluded by
# _is_trusted_loopback so LOCALHOST_BYPASS can't be abused over a
# Cloudflare tunnel / reverse proxy. Keep LOCALHOST_BYPASS=false for
# network-exposed deployments regardless.
if LOCALHOST_BYPASS and _is_trusted_loopback(request):
return await call_next(request)
if not auth_manager.is_configured:
# No users yet — redirect to login for first-time setup
if not path.startswith("/api/"):
return RedirectResponse(url="/login", status_code=302)
return JSONResponse(status_code=401, content={"error": "Setup required"})
# --- Bearer token auth (API tokens for external integrations) ---
auth_header = request.headers.get("authorization", "")
if auth_header.startswith("Bearer ody_"):
raw_token = auth_header[7:]
# Sanity check: tokens are "ody_" + 43 chars of base64
if len(raw_token) < 12 or len(raw_token) > 100:
return JSONResponse(status_code=401, content={"error": "Invalid API token"})
prefix = raw_token[:8]
try:
if app.state._token_cache_dirty:
async with _token_cache_lock:
if app.state._token_cache_dirty:
await _asyncio.to_thread(_refresh_token_cache)
candidates = list(_token_cache.get(prefix, ()))
matched_id = None
matched_owner = None
matched_scopes = []
for tid, thash, owner, scopes in candidates:
if _bcrypt.checkpw(raw_token.encode(), thash.encode()):
matched_id = tid
matched_owner = owner
matched_scopes = scopes or []
break
if matched_id:
# Update last_used_at off the hot path. Doing it
# inline used to keep the request open across an
# extra commit; do it fire-and-forget instead.
async def _touch_last_used(tid: str):
def _do():
_db = SessionLocal()
try:
_db.query(ApiToken).filter(ApiToken.id == tid).update(
{"last_used_at": datetime.utcnow()}
)
_db.commit()
finally:
_db.close()
try:
await _asyncio.to_thread(_do)
except Exception:
pass
_asyncio.create_task(_touch_last_used(matched_id))
# Keep bearer-token callers out of normal cookie/user
# routes. API-aware routes can read api_token_owner.
request.state.current_user = "api"
request.state.api_token = True
request.state.api_token_id = matched_id
request.state.api_token_owner = matched_owner
request.state.api_token_scopes = matched_scopes
return await call_next(request)
except Exception:
logger.warning("API token auth error", exc_info=False)
# Invalid bearer token — reject immediately
return JSONResponse(status_code=401, content={"error": "Invalid API token"})
# --- Cookie-based session auth ---
token = request.cookies.get(SESSION_COOKIE)
if not auth_manager.validate_token(token):
if path.startswith("/api/"):
return JSONResponse(status_code=401, content={"error": "Not authenticated"})
return RedirectResponse(url="/login", status_code=302)
# Attach current username to request state for downstream routes
request.state.current_user = auth_manager.get_username_for_token(token)
request.state.api_token = False
return await call_next(request)
app.add_middleware(AuthMiddleware)
logger.info("Auth middleware enabled (AUTH_ENABLED=true)")
else:
logger.info("Auth middleware disabled (set AUTH_ENABLED=true to enable)")
# ========= STATIC FILES =========
os.makedirs(STATIC_DIR, exist_ok=True)
class _RevalidatingStatic(StaticFiles):
"""Serve static assets normally, but force the browser to REVALIDATE
source files (.js/.css/.html) on every load instead of serving a stale
copy from disk cache. The app ships raw ES modules with no build step or
versioned URLs, so browsers were caching modules across deploys — a code
change wouldn't appear without a manual hard-refresh. `no-cache` keeps the
cached bytes but requires a conditional request; unchanged files still
return a cheap 304 (ETag/Last-Modified are preserved)."""
async def get_response(self, path, scope):
resp = await super().get_response(path, scope)
if path.endswith((".js", ".css", ".html")):
resp.headers["Cache-Control"] = "no-cache"
return resp
app.mount("/static", _RevalidatingStatic(directory="static"), name="static")
# ========= GENERATED IMAGES =========
@app.get("/api/generated-image/{filename}")
async def serve_generated_image(filename: str, request: Request):
"""Serve generated images from the data directory."""
from pathlib import Path
import re
if not re.match(r'^[a-f0-9]{8,64}\.(png|jpg|jpeg|webp|gif|mp4|mov|webm|mkv|m4v)$', filename):
raise HTTPException(status_code=400, detail="Invalid filename")
img_path = Path("data/generated_images") / filename
if not img_path.exists():
raise HTTPException(status_code=404, detail="Image not found")
# SECURITY: filename is the only key, so anyone who knows / guesses a
# 12-hex content hash could pull another user's image bytes. Require
# auth and verify ownership via the gallery row (when one exists).
try:
from src.auth_helpers import get_current_user
from core.database import SessionLocal as _SL, GalleryImage as _GI
_user = get_current_user(request)
if _user:
_db = _SL()
try:
_row = _db.query(_GI).filter(_GI.filename == filename).first()
# Generated-but-not-yet-imported images have no row → allow.
# Row exists with a different owner → 404 (don't confirm existence).
if _row is not None and _row.owner and _row.owner != _user:
raise HTTPException(status_code=404, detail="Image not found")
finally:
_db.close()
except HTTPException:
raise
except Exception:
pass
ext = filename.rsplit('.', 1)[-1].lower()
mime = {
"png": "image/png", "jpg": "image/jpeg", "jpeg": "image/jpeg",
"webp": "image/webp", "gif": "image/gif",
"mp4": "video/mp4", "mov": "video/quicktime", "webm": "video/webm",
"mkv": "video/x-matroska", "m4v": "video/mp4",
}.get(ext, "application/octet-stream")
# Generated-image filenames are content hashes → the bytes for a given
# filename never change. Cache them hard so the gallery doesn't
# re-download every full-size image each time it's opened. `immutable`
# tells the browser it never needs to revalidate within the max-age.
return FileResponse(
str(img_path),
media_type=mime,
headers={"Cache-Control": "public, max-age=31536000, immutable"},
)
# ========= YOUTUBE INIT =========
from services.youtube import init_youtube
init_youtube()
# ========= RAG (vector document RAG) =========
# VectorRAG (ChromaDB-backed personal-document semantic search). Initialized
# lazily via get_rag_manager() — returns None if ChromaDB isn't reachable
# (no server running on the configured host:port), in which case personal-doc
# routes return a clean 503 instead of busy-retrying every request.
#
# Note: this was previously hardcoded off because chromadb 1.4.1 / pydantic
# 2.12 were mutually incompatible at the time. With the current pins
# (chromadb 1.5.x + pydantic 2.13.x) the init works and Personal Docs
# (POST /api/personal/add_directory etc.) is functional again.
from src.rag_singleton import get_rag_manager
rag_manager = get_rag_manager()
rag_available = rag_manager is not None
if rag_available:
logger.info("Vector document RAG initialized")
else:
logger.info(
"Vector document RAG not available at startup "
"(ChromaDB may not be reachable yet — routes will retry lazily)"
)
# ========= IMPORT CONFIG =========
from src.config import config
# ========= COMPONENT INITIALIZATION =========
from src.app_initializer import initialize_managers
components = initialize_managers(BASE_DIR, rag_manager)
session_manager = components["session_manager"]
from src.assistant_log import set_session_manager as _set_asst_sm
_set_asst_sm(session_manager)
memory_manager = components["memory_manager"]
memory_vector = components.get("memory_vector")
upload_handler = components["upload_handler"]
personal_docs_mgr = components["personal_docs_manager"]
api_key_manager = components["api_key_manager"]
preset_manager = components["preset_manager"]
chat_processor = components["chat_processor"]
research_handler = components["research_handler"]
chat_handler = components["chat_handler"]
model_discovery = components["model_discovery"]
skills_manager = components["skills_manager"]
# TTS
from services.tts import get_tts_service
tts_service = get_tts_service()
logger.info("TTS service initialized (provider managed via admin settings)")
# ========= EXCEPTION HANDLERS =========
@app.exception_handler(SessionNotFoundError)
async def session_not_found_handler(request: Request, exc: SessionNotFoundError):
return JSONResponse(status_code=404, content={"error": "SESSION_NOT_FOUND", "message": str(exc)})
@app.exception_handler(InvalidFileUploadError)
async def invalid_file_upload_handler(request: Request, exc: InvalidFileUploadError):
return JSONResponse(status_code=400, content={"error": "INVALID_FILE_UPLOAD", "message": str(exc)})
@app.exception_handler(LLMServiceError)
async def llm_service_error_handler(request: Request, exc: LLMServiceError):
return JSONResponse(status_code=502, content={"error": "LLM_SERVICE_ERROR", "message": str(exc)})
@app.exception_handler(WebSearchError)
async def web_search_error_handler(request: Request, exc: WebSearchError):
return JSONResponse(status_code=502, content={"error": "WEB_SEARCH_ERROR", "message": str(exc)})
# ========= WEBHOOK MANAGER =========
from src.webhook_manager import WebhookManager
webhook_manager = WebhookManager(api_key_manager=api_key_manager)
# ========= INCLUDE ROUTERS =========
# Auth
auth_router = setup_auth_routes(auth_manager)
app.include_router(auth_router)
# Uploads
from routes.upload_routes import setup_upload_routes
upload_router, upload_cleanup_func = setup_upload_routes(upload_handler)
app.include_router(upload_router)
upload_cleanup_task = None
# Emoji SVG proxy (same-origin, lazy-cached Twemoji) — lets the chat render
# emojis as flat SVG instead of system color glyphs.
from routes.emoji_routes import setup_emoji_routes
app.include_router(setup_emoji_routes())
# Sessions
from routes.session_routes import setup_session_routes
session_config = {"REQUEST_TIMEOUT": REQUEST_TIMEOUT, "OPENAI_API_KEY": OPENAI_API_KEY, "SESSIONS_FILE": SESSIONS_FILE}
app.include_router(setup_session_routes(session_manager, session_config, webhook_manager=webhook_manager))
# Admin Danger Zone wipes (Settings → System → Danger Zone)
from routes.admin_wipe_routes import setup_admin_wipe_routes
app.include_router(setup_admin_wipe_routes(session_manager))
# Memory
from routes.memory_routes import setup_memory_routes
app.include_router(setup_memory_routes(memory_manager, session_manager, memory_vector=memory_vector))
from routes.skills_routes import setup_skills_routes
app.include_router(setup_skills_routes(skills_manager))
# Chat
from routes.chat_routes import setup_chat_routes
app.include_router(setup_chat_routes(
session_manager, chat_handler, chat_processor,
memory_manager, research_handler, upload_handler,
memory_vector=memory_vector,
webhook_manager=webhook_manager,
skills_manager=skills_manager,
))
# Research (background deep-research tasks)
from routes.research_routes import setup_research_routes
app.include_router(setup_research_routes(research_handler, session_manager=session_manager))
# History
from routes.history_routes import setup_history_routes
app.include_router(setup_history_routes(session_manager))
# Search
from routes.search_routes import setup_search_routes
app.include_router(setup_search_routes(config))
# Presets
from routes.preset_routes import setup_preset_routes
app.include_router(setup_preset_routes(preset_manager))
# Diagnostics
from routes.diagnostics_routes import setup_diagnostics_routes
app.include_router(setup_diagnostics_routes(rag_manager, rag_available, research_handler))
# Cleanup
from routes.cleanup_routes import setup_cleanup_routes
app.include_router(setup_cleanup_routes(session_manager))
# Personal docs
from routes.personal_routes import setup_personal_routes
app.include_router(setup_personal_routes(personal_docs_mgr, rag_manager, rag_available))
# Embedding model management
from routes.embedding_routes import setup_embedding_routes
app.include_router(setup_embedding_routes())
# Models
from routes.model_routes import setup_model_routes
app.include_router(setup_model_routes(model_discovery))
# TTS
from routes.tts_routes import setup_tts_routes
app.include_router(setup_tts_routes(tts_service))
# STT
from services.stt import get_stt_service
stt_service = get_stt_service()
from routes.stt_routes import setup_stt_routes
app.include_router(setup_stt_routes(stt_service))
logger.info("STT service initialized (provider managed via settings)")
# Documents (artifacts/canvas)
from routes.document_routes import setup_document_routes
app.include_router(setup_document_routes(session_manager, upload_handler))
# Signatures (reusable image stamps)
from routes.signature_routes import setup_signature_routes
app.include_router(setup_signature_routes())
# Gallery (image library)
from routes.gallery_routes import setup_gallery_routes
app.include_router(setup_gallery_routes())
# Persisted image-editor drafts (server-backed projects)
from routes.editor_draft_routes import setup_editor_draft_routes
app.include_router(setup_editor_draft_routes())
# Scheduled tasks + event bus
from src.task_scheduler import TaskScheduler
task_scheduler = TaskScheduler(session_manager)
from src.event_bus import set_task_scheduler
set_task_scheduler(task_scheduler)
from routes.task_routes import setup_task_routes
app.include_router(setup_task_routes(task_scheduler))
from routes.assistant_routes import setup_assistant_routes
app.include_router(setup_assistant_routes(task_scheduler))
# Calendar (CalDAV)
from routes.calendar_routes import setup_calendar_routes
app.include_router(setup_calendar_routes())
# Shell (user-facing command execution)
from routes.shell_routes import setup_shell_routes
app.include_router(setup_shell_routes())
# Cookbook (model download/serve/cache, cookbook state sync)
from routes.cookbook_routes import setup_cookbook_routes
app.include_router(setup_cookbook_routes())
# Hardware model fitting (cookbook "What Fits?" tab)
from routes.hwfit_routes import setup_hwfit_routes
app.include_router(setup_hwfit_routes())
# Model A/B Comparison
from routes.compare_routes import setup_compare_routes
app.include_router(setup_compare_routes(session_manager))
# User Preferences
from routes.prefs_routes import setup_prefs_routes
app.include_router(setup_prefs_routes())
# Backup (export/import user data)
from routes.backup_routes import setup_backup_routes
app.include_router(setup_backup_routes(memory_manager, preset_manager, skills_manager))
from routes.font_routes import setup_font_routes
app.include_router(setup_font_routes())
# MCP (Model Context Protocol)
from src.mcp_manager import McpManager
from src.agent_tools import set_mcp_manager
from routes.mcp_routes import setup_mcp_routes
mcp_manager = McpManager()
set_mcp_manager(mcp_manager)
app.include_router(setup_mcp_routes(mcp_manager))
logger.info("MCP routes initialized")
# AI Interaction tools (debates, pipelines, self-managing AI, UI control)
from src.ai_interaction import set_session_manager as set_ai_session_manager, set_memory_manager as set_ai_memory_manager, set_rag_manager as set_ai_rag_manager
set_ai_session_manager(session_manager)
set_ai_memory_manager(memory_manager, memory_vector)
set_ai_rag_manager(rag_manager, personal_docs_mgr)
logger.info("AI interaction tools initialized (session, memory, RAG, UI control)")
# Webhooks
from routes.webhook_routes import setup_webhook_routes
app.include_router(setup_webhook_routes(webhook_manager, auth_manager, session_manager, api_key_manager))
# API Tokens
from routes.api_token_routes import setup_api_token_routes
app.include_router(setup_api_token_routes())
logger.info("Webhook & API token routes initialized")
# Notes (Google Keep-style notes/todos)
from routes.note_routes import setup_note_routes
app.include_router(setup_note_routes(task_scheduler))
# Email
from routes.email_routes import setup_email_routes
app.include_router(setup_email_routes())
from routes.vault_routes import setup_vault_routes
app.include_router(setup_vault_routes())
# Contacts (CardDAV)
from routes.contacts_routes import setup_contacts_routes
app.include_router(setup_contacts_routes())
from companion import setup_companion_routes
app.include_router(setup_companion_routes())
# ========= ROUTES (kept in app.py) =========
def _serve_html_with_nonce(request: Request, file_path: str) -> HTMLResponse:
"""Read an HTML file and inject the CSP nonce into inline <script> tags."""
with open(file_path, "r", encoding="utf-8") as f:
html = f.read()
nonce = getattr(request.state, "csp_nonce", "")
html = html.replace("{{CSP_NONCE}}", nonce)
return HTMLResponse(html)
@app.get("/")
async def serve_index(request: Request):
static_path = abs_join(BASE_DIR, "static/index.html")
if os.path.exists(static_path):
return _serve_html_with_nonce(request, static_path)
root_path = abs_join(BASE_DIR, "index.html")
if os.path.exists(root_path):
return _serve_html_with_nonce(request, root_path)
raise HTTPException(404, "index.html not found")
@app.get("/notes")
async def serve_notes(request: Request):
return await serve_index(request)
@app.get("/calendar")
async def serve_calendar(request: Request):
return await serve_index(request)
# Per-tool deep-link routes — all serve the same SPA, the JS auto-opens
# the matching modal based on window.location.pathname. Each route also
# gets a unique favicon + page title via inline script in index.html so
# bookmarks render with tool-specific icons.
@app.get("/cookbook")
async def serve_cookbook(request: Request):
return await serve_index(request)
@app.get("/email")
async def serve_email(request: Request):
return await serve_index(request)
@app.get("/memory")
async def serve_memory(request: Request):
return await serve_index(request)
@app.get("/gallery")
async def serve_gallery(request: Request):
return await serve_index(request)
@app.get("/tasks")
async def serve_tasks(request: Request):
return await serve_index(request)
@app.get("/library")
async def serve_library(request: Request):
return await serve_index(request)
@app.get("/backgrounds")
async def serve_backgrounds(request: Request):
"""Sandbox page for prototyping background effects. No auth required."""
return _serve_html_with_nonce(request, abs_join(BASE_DIR, "static/backgrounds.html"))
@app.get("/login")
async def serve_login(request: Request):
return _serve_html_with_nonce(request, abs_join(BASE_DIR, "static/login.html"))
@app.get("/api/version")
async def get_version():
from core.constants import APP_VERSION
return {"version": APP_VERSION}
@app.get("/api/health")
async def health_check() -> Dict[str, str]:
return {"status": "healthy", "timestamp": datetime.utcnow().isoformat()}
@app.get("/api/ready")
async def readiness_check() -> JSONResponse:
"""Readiness / integrity self-check — DB, data dir, local-first storage.
Unlike /api/health (liveness), this returns 503 unless every critical
subsystem is whole, so an orchestrator can gate traffic on real readiness.
"""
from src.readiness import check_readiness
result = check_readiness()
return JSONResponse(status_code=200 if result.get("ready") else 503, content=result)
@app.get("/api/runtime")
async def runtime_info() -> Dict[str, object]:
in_docker = os.path.exists("/.dockerenv")
if not in_docker:
try:
with open("/proc/1/cgroup", "r", encoding="utf-8", errors="ignore") as fh:
cg = fh.read()
in_docker = any(marker in cg for marker in ("docker", "containerd", "kubepods"))
except Exception:
in_docker = False
ollama_url = (
os.getenv("OLLAMA_BASE_URL")
or os.getenv("OLLAMA_URL")
or ("http://host.docker.internal:11434/v1" if in_docker else "http://127.0.0.1:11434/v1")
)
return {
"in_docker": in_docker,
"ollama_base_url": ollama_url,
}
# ========= LIFECYCLE =========
@asynccontextmanager
async def _lifespan(app):
"""Modern lifespan context manager replacing deprecated @app.on_event."""
# ── STARTUP ──
await _startup_event()
yield
# ── SHUTDOWN ──
await _shutdown_event()
app.router.lifespan_context = _lifespan
async def _startup_event():
global upload_cleanup_task
logger.info("Application starting up...")
webhook_manager.set_loop(asyncio.get_running_loop())
# Wipe any leftover incognito sessions from previous process — they're
# ephemeral by design and must not survive a restart.
try:
from core.database import SessionLocal as _SL, Session as _DbSess, ChatMessage as _DbMsg
_db = _SL()
try:
_ghosts = _db.query(_DbSess).filter(_DbSess.name.in_(("Nobody", "Incognito"))).all()
for _g in _ghosts:
_db.query(_DbMsg).filter(_DbMsg.session_id == _g.id).delete()
_db.delete(_g)
if _ghosts:
_db.commit()
logger.info(f"Purged {len(_ghosts)} leftover incognito session(s)")
finally:
_db.close()
except Exception as e:
logger.debug(f"Incognito purge skipped: {e}")
# Strong refs to fire-and-forget startup tasks. Without this, Python may
# GC tasks created with `asyncio.create_task(...)` before they finish.
_startup_tasks: list[asyncio.Task] = getattr(app.state, "_startup_tasks", [])
app.state._startup_tasks = _startup_tasks
if upload_cleanup_func:
upload_cleanup_task = asyncio.create_task(upload_cleanup_func())
# Always-on monitor that auto-continues the agent when a background bash
# job (#!bg) finishes — re-invokes the turn with the job output.
try:
from src.bg_monitor import start_bg_monitor
_startup_tasks.append(start_bg_monitor())
except Exception as _e:
logger.warning("Failed to start background-job monitor: %s", _e)
# MCP servers can be slow or blocked by local tooling. Connect them after
# the web server is accepting traffic instead of delaying the whole UI.
async def _startup_mcp_connections():
try:
from src.builtin_mcp import register_builtin_servers
await register_builtin_servers(mcp_manager)
except BaseException as e:
logger.warning(f"Built-in MCP registration failed (non-critical): {type(e).__name__}: {e}")
try:
await asyncio.wait_for(mcp_manager.connect_all_enabled(), timeout=20)
except asyncio.TimeoutError:
logger.warning("User MCP startup timed out (non-critical)")
except BaseException as e:
logger.warning(f"MCP startup failed (non-critical): {type(e).__name__}: {e}")
_startup_tasks.append(asyncio.create_task(_startup_mcp_connections()))
# Pre-warm the RAG tool index off the request path. Loading the local
# embedding model + opening ChromaDB + indexing the built-in tools is a
# one-time ~1-3s cost that otherwise lands on the user's FIRST message
# (showing up as a big `tool_selection` time). Doing it here makes the
# first turn as fast as subsequent ones (warm embed ≈ a few ms).
async def _warmup_tool_index():
try:
from src.tool_index import get_tool_index
idx = await asyncio.to_thread(get_tool_index)
if idx:
await asyncio.to_thread(idx.get_tools_for_query, "warmup", 8)
logger.info("[startup] Tool index pre-warmed")
except Exception as e:
logger.warning(f"Tool index warmup failed (non-critical): {type(e).__name__}: {e}")
_startup_tasks.append(asyncio.create_task(_warmup_tool_index()))
# Warmup: ping all known LLM endpoints to prime connections
async def _warmup_endpoints():
try:
import httpx
endpoints = model_discovery.get_endpoints() if model_discovery else []
for ep in endpoints[:5]:
url = ep.get("url", "").replace("/chat/completions", "/models")
if url:
try:
async with httpx.AsyncClient(timeout=5.0) as client:
await client.get(url)
logger.info(f"Warmup ping OK: {url}")
except Exception as e:
logger.debug(f"Warmup ping failed for endpoint: {e}")
except Exception as e:
logger.debug(f"Warmup ping skipped: {e}")
_startup_tasks.append(asyncio.create_task(_warmup_endpoints()))
# Keep-alive: ping endpoints every 60 seconds to prevent cold starts
async def _keepalive_loop():
while True:
try:
await asyncio.sleep(60)
await _warmup_endpoints()
except Exception as e:
logger.warning(f"Keepalive loop error: {e}")
await asyncio.sleep(300) # Back off on error
_startup_tasks.append(asyncio.create_task(_keepalive_loop()))
async def _ensure_default_tasks():
# Create/reconcile default automation tasks + personal assistant for every user.
owners = set()
try:
import json as _json
auth_path = "data/auth.json"
with open(auth_path, encoding="utf-8") as f:
users = _json.load(f).get("users", {})
owners.update(users.keys())
except Exception as e:
logger.debug(f"Default task auth-owner scan: {e}")
# Also reconcile owners already present in scheduled_tasks. This cleans
# up stale/demo/deleted-user built-ins that are no longer in auth.json;
# otherwise their old scheduled rows can keep firing forever.
try:
from core.database import SessionLocal, ScheduledTask
from src.task_scheduler import HOUSEKEEPING_DEFAULTS
builtin_names = []
for defs in HOUSEKEEPING_DEFAULTS.values():
builtin_names.append(defs["name"])
builtin_names.extend(defs.get("legacy_names") or [])
db_seed = SessionLocal()
try:
rows = db_seed.query(ScheduledTask.owner).filter(
(ScheduledTask.action.in_(list(HOUSEKEEPING_DEFAULTS.keys())))
| (ScheduledTask.name.in_(builtin_names))
).distinct().all()
owners.update(row[0] for row in rows if row[0])
finally:
db_seed.close()
except Exception as e:
logger.debug(f"Default task existing-owner scan: {e}")
try:
for uname in sorted(owners):
try:
await task_scheduler.ensure_defaults(uname)
except Exception as e:
logger.debug(f"ensure_defaults({uname}): {e}")
except Exception as e:
logger.debug(f"Default tasks: {e}")
# Reconcile built-in tasks before the runner starts. Otherwise legacy
# scheduled built-ins can fire once before being converted to event tasks.
await _ensure_default_tasks()
# Disk-backed skills are not covered by the DB legacy-owner sweep. Repair
# ownerless or deleted/test-owner SKILL.md files so strict owner filtering
# does not make an existing library look empty after auth/account changes.
try:
import json as _json
auth_path = "data/auth.json"
with open(auth_path, encoding="utf-8") as f:
users = _json.load(f).get("users", {})
primary_owner = None
for uname, udata in users.items():
if udata.get("is_admin") is True:
primary_owner = uname
break
if not primary_owner and users:
primary_owner = next(iter(users))
if primary_owner:
changed = skills_manager.backfill_owner(primary_owner, set(users.keys()))
if changed:
logger.info("Assigned %s legacy skill file(s) to %s", changed, primary_owner)
except Exception as e:
logger.debug(f"Skill owner backfill skipped: {e}")
# Start scheduled task runner — skip when running under a cron-driven
# deployment where an external worker drives task firing. Mirrors
# `ODYSSEUS_INPROCESS_POLLERS` from the email pollers.
_tasks_inprocess = os.environ.get("ODYSSEUS_INPROCESS_TASKS", "1").strip().lower()
if _tasks_inprocess not in ("0", "false", "no", "off", ""):
await task_scheduler.start()
else:
logger.info(
"In-process task scheduler disabled (ODYSSEUS_INPROCESS_TASKS=0); "
"drive task firing externally (e.g. cron)."